Files
mostovik-backend/docs/RUKOVODSTVO_PROGRAMMISTA.md

38 KiB
Raw Permalink Blame History

РУКОВОДСТВО СИСТЕМНОГО ПРОГРАММИСТА

Система ETL MOSTOVIK

Версия документа: 1.0
Дата: 2026-01-21


СОДЕРЖАНИЕ

  1. Общие сведения
  2. Структура проекта
  3. Технологический стек
  4. Конфигурация и зависимости
  5. Модели данных
  6. Сервисы и бизнес-логика
  7. Задачи Celery
  8. Клиенты и парсеры
  9. API и представители
  10. Тестирование
  11. Разработка и отладка
  12. Расширение функциональности

1. ОБЩИЕ СВЕДЕНИЯ

1.1. Назначение документа

Руководство содержит техническую информацию для разработчиков и системных программистов, работающих с системой MOSTOVIK.

1.2. Описание системы

MOSTOVIK — ETL-система для сбора, обработки и хранения данных из государственных источников:

  • Минпромторг (minpromtorg.gov.ru) — сертификаты промышленного производства, реестр производителей
  • Проверки.гов.ру (proverki.gov.ru) — данные о проверках (ФЗ-294, ФЗ-248)
  • ФНС — бухгалтерская отчётность

1.3. Основные возможности

  • Автоматический парсинг данных через Celery
  • Отслеживание прогресса задач (BackgroundJob)
  • Логирование всех операций (ParserLoadLog)
  • Повторные попытки при ошибках
  • Потоковая обработка больших файлов
  • Дедупликация данных

2. СТРУКТУРА ПРОЕКТА

mostovik-backend/
├── src/                        # Исходный код Django
│   ├── config/                # Конфигурация Django
│   │   ├── settings/          # Настройки (base, dev, prod, test)
│   │   │   ├── base.py       # Базовая конфигурация
│   │   │   ├── dev.py        # Разработка
│   │   │   ├── production.py # Production
│   │   │   └── test.py       # Тесты
│   │   ├── celery.py         # Конфигурация Celery
│   │   ├── urls.py           # Корневые URL
│   │   ├── api_v1_urls.py    # Маршруты API
│   │   ├── wsgi.py           # Точка входа WSGI
│   │   └── asgi.py           # Точка входа ASGI
│   ├── apps/                  # Приложения Django
│   │   ├── core/             # Базовые компоненты
│   │   │   ├── models.py    # Базовые модели (TimestampMixin)
│   │   │   ├── services.py  # BackgroundJobService
│   │   │   ├── tasks.py     # Общие задачи
│   │   │   ├── views.py     # Базовые классы представлений
│   │   │   ├── serializers.py
│   │   │   ├── filters.py
│   │   │   ├── pagination.py
│   │   │   ├── permissions.py
│   │   │   └── middleware.py
│   │   ├── parsers/          # Парсеры данных
│   │   │   ├── models.py    # Модели данных
│   │   │   ├── services.py  # Бизнес-логика
│   │   │   ├── tasks.py     # Задачи Celery
│   │   │   ├── clients/     # API-клиенты
│   │   │   │   ├── minpromtorg/
│   │   │   │   ├── proverki/
│   │   │   │   ├── zakupki/
│   │   │   │   └── fns/
│   │   │   ├── admin.py     # Django admin
│   │   │   ├── urls.py      # Маршруты
│   │   │   ├── views.py     # Классы представлений
│   │   │   └── serializers.py
│   │   └── user/            # Пользователи
│   │       ├── models.py
│   │       ├── admin.py
│   │       └── ...
│   └── manage.py            # Утилита управления Django
├── tests/                    # Тесты
│   ├── apps/
│   │   ├── user/
│   │   └── parsers/
│   ├── conftest.py          # Конфигурация pytest
│   └── factories.py         # Фабрики Factory Boy
├── deploy/                   # Развёртывание
│   ├── scripts/             # Скрипты
│   ├── systemd/             # Файлы systemd
│   └── apache/              # Конфигурация Apache
├── docker/                   # Конфигурация Docker
├── scripts/                  # Вспомогательные скрипты
├── logs/                     # Логи
├── .env.example             # Пример окружения
├── pyproject.toml           # Зависимости и конфигурация
├── requirements.txt         # Производственные зависимости
├── requirements-dev.txt     # Зависимости для разработки
└── Makefile                 # Команды разработки

3. ТЕХНОЛОГИЧЕСКИЙ СТЕК

3.1. Основные технологии

Компонент Технология Версия
Язык Python 3.11
Фреймворк Django 3.2.25
API Django REST Framework 3.14.0
БД PostgreSQL 15.10
Кеш Redis 7.x
Очереди Celery 5.3.6
Веб-сервер Gunicorn + Apache 21.2.0 / 2.4.57

3.2. Библиотеки для парсинга

Назначение Библиотека Версия
Автоматизация браузера Playwright 1.57.0+
Web scraping Scrapy 2.11.2
Browser automation Selenium 4.17.2
HTML parsing BeautifulSoup4 4.12.3
HTTP-запросы requests 2.31.0

3.3. Обработка данных

Назначение Библиотека Версия
Таблицы pandas 2.0.3
Excel openpyxl 3.1.5+
Word python-docx 1.2.0+
CSV built-in -

3.4. Инструменты разработки

Назначение Инструмент Версия
Тестирование pytest 7.4.4
Покрытие coverage 7.4.0
Linting ruff 0.1.14
Форматирование black 23.12.1
Сортировка импортов isort 5.13.2
Проверка типов mypy 1.8.0
Безопасность bandit 1.7.5

4. КОНФИГУРАЦИЯ И ЗАВИСИМОСТИ

4.1. Установка зависимостей

# Через uv (рекомендуется)
uv pip install -e ".[dev]"

# Или через requirements
uv pip install -r requirements.txt
uv pip install -r requirements-dev.txt

4.2. Конфигурация окружения

cp .env.example .env

Ключевые переменные для разработки:

# Django
DJANGO_SETTINGS_MODULE=config.settings.dev
DEBUG=True
SECRET_KEY=django-insecure-dev-key

# База данных
POSTGRES_DB=mostovik
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_HOST=127.0.0.1
POSTGRES_PORT=5432

# Redis
REDIS_URL=redis://127.0.0.1:6379/0
CELERY_BROKER_URL=redis://127.0.0.1:6379/0

# API парсеров
ZAKUPKI_TOKEN=<token>
CHECKO_API_KEY=<key>

4.3. Конфигурация инструментов (pyproject.toml)

[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "config.settings.test"
testpaths = ["tests"]
addopts = ["--verbose", "--tb=short", "--reuse-db"]

[tool.ruff]
line-length = 88
target-version = "py311"

[tool.black]
line-length = 88
target-version = ['py311']

[tool.mypy]
python_version = "3.11"
check_untyped_defs = true
plugins = ["mypy_django_plugin.main"]

5. МОДЕЛИ ДАННЫХ

5.1. Базовые модели (apps.core)

TimestampMixin — миксин для автоматического добавления полей времени:

class TimestampMixin(models.Model):
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    class Meta:
        abstract = True

5.2. Модели парсеров (apps.parsers)

ParserLoadLog

Лог загрузок парсеров:

class ParserLoadLog(TimestampMixin, models.Model):
    class Source(models.TextChoices):
        INDUSTRIAL = "industrial", "Промышленное производство"
        MANUFACTURES = "manufactures", "Реестр производителей"
        INSPECTIONS = "inspections", "Единый реестр проверок"
        PROCUREMENTS = "procurements", "Госзакупки"
        FNS_REPORTS = "fns_reports", "Бухгалтерская отчётность ФНС"

    batch_id = models.PositiveIntegerField(db_index=True)
    source = models.CharField(max_length=50, choices=Source.choices, db_index=True)
    records_count = models.PositiveIntegerField(default=0)
    status = models.CharField(max_length=20, default="success")
    error_message = models.TextField(blank=True)

    class Meta:
        db_table = "parsers_load_log"
        constraints = [
            models.UniqueConstraint(
                fields=["source", "batch_id"],
                name="unique_load_batch_per_source",
            ),
        ]

IndustrialCertificateRecord

Сертификат промышленного производства:

class IndustrialCertificateRecord(TimestampMixin, models.Model):
    load_batch = models.PositiveIntegerField(db_index=True)
    issue_date = models.CharField(max_length=15, blank=True)
    certificate_number = models.CharField(max_length=100, db_index=True)
    expiry_date = models.CharField(max_length=15, blank=True)
    certificate_file_url = models.TextField(blank=True)
    organisation_name = models.TextField()
    inn = models.CharField(max_length=20, db_index=True)
    ogrn = models.CharField(max_length=20, db_index=True)

    class Meta:
        db_table = "parsers_industrial_certificate"
        constraints = [
            models.UniqueConstraint(
                fields=["certificate_number"],
                name="unique_certificate_number",
            ),
        ]

ManufacturerRecord

Производитель из реестра Минпромторга:

class ManufacturerRecord(TimestampMixin, models.Model):
    load_batch = models.PositiveIntegerField(db_index=True)
    inn = models.CharField(max_length=20, db_index=True)
    ogrn = models.CharField(max_length=20, db_index=True)
    manufacturer_name = models.TextField()
    # ... другие поля

InspectionRecord

Запись о проверке (proverki.gov.ru):

class InspectionRecord(TimestampMixin, models.Model):
    class InspectionType(models.TextChoices):
        FZ294 = "294", "ФЗ-294 (традиционные)"
        FZ248 = "248", "ФЗ-248 (новые)"

    load_batch = models.PositiveIntegerField(db_index=True)
    inspection_id = models.CharField(max_length=100, unique=True)
    inspection_type = models.CharField(max_length=3, choices=InspectionType.choices)
    data_year = models.PositiveIntegerField()
    data_month = models.PositiveIntegerField()
    # ... другие поля

FinancialReport

Бухгалтерский отчёт (ФНС):

class FinancialReport(TimestampMixin, models.Model):
    class SourceType(models.TextChoices):
        FILE_WATCH = "file_watch", "Мониторинг папок"
        MANUAL = "manual", "Ручная загрузка"

    external_id = models.CharField(max_length=100, unique=True)
    ogrn = models.CharField(max_length=20, db_index=True)
    file_name = models.CharField(max_length=255)
    file_hash = models.CharField(max_length=64, db_index=True)
    source = models.CharField(max_length=20, choices=SourceType.choices)
    batch_id = models.PositiveIntegerField(db_index=True)

5.3. Модели ядра (apps.core)

BackgroundJob

Отслеживание прогресса задач:

class BackgroundJob(TimestampMixin, models.Model):
    class Status(models.TextChoices):
        PENDING = "pending", "Ожидание"
        RUNNING = "running", "Выполняется"
        COMPLETED = "completed", "Завершено"
        FAILED = "failed", "Ошибка"

    task_id = models.CharField(max_length=100, unique=True)
    task_name = models.CharField(max_length=255)
    status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING)
    progress = models.PositiveIntegerField(default=0)  # 0-100
    message = models.TextField(blank=True)
    meta = models.JSONField(default=dict, blank=True)
    result = models.JSONField(default=dict, blank=True)
    error = models.TextField(blank=True)
    started_at = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)

6. СЕРВИСЫ И БИЗНЕС-ЛОГИКА

6.1. BackgroundJobService (apps.core)

Управление фоновыми задачами:

class BackgroundJobService:
    @classmethod
    def create_job(
        cls,
        task_id: str,
        task_name: str,
        meta: dict | None = None,
    ) -> BackgroundJob:
        """Создать новую задачу."""
        return BackgroundJob.objects.create(
            task_id=task_id,
            task_name=task_name,
            meta=meta or {},
        )

    @classmethod
    def get_by_task_id(cls, task_id: str) -> BackgroundJob | None:
        """Получить задачу по ID."""
        return BackgroundJob.objects.filter(task_id=task_id).first()

Методы:

Метод Описание
create_job() Создание новой задачи
get_by_task_id() Получение задачи по ID
mark_started() Установка статуса "running"
update_progress() Обновление прогресса (%)
complete() Завершение задачи
fail() Пометка как ошибочной

6.2. ParserLoadLogService (apps.parsers)

Логирование загрузок:

class ParserLoadLogService:
    @classmethod
    def create_load_log_with_next_batch_id(
        cls,
        source: ParserLoadLog.Source,
        status: str = "in_progress",
    ) -> tuple[ParserLoadLog, int]:
        """Создать лог загрузки со следующим batch_id."""
        last_log = ParserLoadLog.objects.filter(source=source).order_by('-batch_id').first()
        next_batch_id = (last_log.batch_id + 1) if last_log else 1

        log = ParserLoadLog.objects.create(
            batch_id=next_batch_id,
            source=source,
            status=status,
        )
        return log, next_batch_id

    @classmethod
    def update(
        cls,
        log: ParserLoadLog,
        status: str | None = None,
        records_count: int | None = None,
        error_message: str | None = None,
    ) -> None:
        """Обновить лог загрузки."""
        # ...

    @classmethod
    def mark_failed(cls, log: ParserLoadLog, error: str) -> None:
        """Пометить как неудачную."""
        # ...

6.3. Сервисы парсеров

IndustrialCertificateService

class IndustrialCertificateService:
    @classmethod
    def save_certificates(
        cls,
        certificates: list[dict],
        batch_id: int,
    ) -> int:
        """Сохранить сертификаты с дедупликацией."""
        saved = 0
        for cert_data in certificates:
            obj, created = IndustrialCertificateRecord.objects.update_or_create(
                certificate_number=cert_data['certificate_number'],
                defaults={**cert_data, 'load_batch': batch_id},
            )
            if created:
                saved += 1
        return saved

InspectionService

class InspectionService:
    @classmethod
    def get_last_loaded_period(
        cls,
        is_federal_law_248: bool,
    ) -> tuple[int | None, int | None]:
        """Получить последний загруженный период."""
        last_log = InspectionRecord.objects.filter(
            is_federal_law_248=is_federal_law_248
        ).order_by('-data_year', '-data_month').first()

        if last_log:
            return last_log.data_year, last_log.data_month
        return None, None

    @classmethod
    def save_inspections(
        cls,
        inspections: list[dict],
        batch_id: int,
        is_federal_law_248: bool = False,
        data_year: int | None = None,
        data_month: int | None = None,
    ) -> int:
        """Сохранить данные о проверках."""
        # ...

FNSReportService

class FNSReportService:
    @classmethod
    def exists_by_hash(cls, file_hash: str) -> bool:
        """Проверить существование файла по хешу."""
        return FinancialReport.objects.filter(file_hash=file_hash).exists()

    @classmethod
    def save_report(
        cls,
        external_id: str,
        ogrn: str,
        file_name: str,
        file_hash: str,
        source: FinancialReport.SourceType,
        batch_id: int,
        lines_data: list[dict],
    ) -> FinancialReport:
        """Сохранить отчёт со строками."""
        # ...

7. ЗАДАЧИ CELERY

7.1. Конфигурация (config/celery.py)

app = Celery("project")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

app.conf.beat_schedule = {
    "parse-industrial-production-daily": {
        "task": "apps.parsers.tasks.parse_industrial_production",
        "schedule": 86400.0,  # Каждые 24 часа
    },
    "parse-manufactures-daily": {
        "task": "apps.parsers.tasks.parse_manufactures",
        "schedule": 86400.0,
    },
    "scan-fns-directory": {
        "task": "apps.parsers.tasks.scan_fns_directory",
        "schedule": 300.0,  # Каждые 5 минут
    },
}

7.2. Основные задачи

parse_industrial_production

@shared_task(bind=True)
def parse_industrial_production(
    self,
    proxies: list[str] | None = None,
    client_adapter: BaseAdapter | None = None,
) -> dict:
    """Парсинг сертификатов Минпромторга."""
    source = ParserLoadLog.Source.INDUSTRIAL
    load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
        source=source, status="in_progress"
    )
    task_id = self.request.id or str(uuid.uuid4())

    # Создание BackgroundJob
    job = BackgroundJobService.create_job(
        task_id=task_id,
        task_name="apps.parsers.tasks.parse_industrial_production",
        meta={"source": source, "batch_id": batch_id},
    )
    job.mark_started()

    try:
        # Парсинг
        if proxies is None:
            proxies = ProxyService.get_active_proxies_or_none()

        with IndustrialProductionClient(proxies=proxies) as client:
            certificates = client.fetch_certificates()

        # Сохранение
        saved_count = IndustrialCertificateService.save_certificates(
            certificates, batch_id=batch_id
        )

        ParserLoadLogService.update(load_log, status="success", records_count=saved_count)
        job.complete(result={"batch_id": batch_id, "saved": saved_count})

        return {"batch_id": batch_id, "saved": saved_count, "status": "success"}

    except Exception as e:
        ParserLoadLogService.mark_failed(load_log, str(e))
        job.fail(error=str(e))
        return {"status": "failed", "error": str(e)}

sync_inspections

Автоматическая синхронизация проверок:

@shared_task(bind=True)
def sync_inspections(
    self,
    proxies: list[str] | None = None,
    use_playwright: bool | None = None,
    current_year: int | None = None,
    current_month: int | None = None,
) -> dict:
    """Синхронизация данных о проверках.

    Логика:
    1. Проверяет последнюю загруженную дату в БД
    2. Если данных нет — начинает с 01.01.2025
    3. Загружает месяц за месяцем до текущего
    4. Загружает оба типа проверок (ФЗ-294 и ФЗ-248)
    5. При отсутствии данных (2 пустых месяца) — прекращает
    """
    # ...

process_fns_file

@shared_task(bind=True)
def process_fns_file(self, file_path: str | Path) -> dict:
    """Обработка файла ФНС."""
    task_id = self.request.id
    file_path = Path(file_path)

    # Создание BackgroundJob
    job = BackgroundJobService.create_job(
        task_id=task_id,
        task_name="apps.parsers.tasks.process_fns_file",
        meta={"file": file_path.name},
    )
    job.mark_started()

    try:
        # Проверка дубликата
        file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
        if FNSReportService.exists_by_hash(file_hash):
            job.complete(result={"status": "skipped", "reason": "duplicate"})
            return {"status": "skipped"}

        # Парсинг Excel
        parsed = FNSExcelParser.parse_file(file_path)

        # Сохранение
        report = FNSReportService.save_report(
            external_id=parsed.external_id,
            ogrn=parsed.ogrn,
            file_name=file_path.name,
            file_hash=file_hash,
            source=FinancialReport.SourceType.FILE_WATCH,
            lines_data=[asdict(line) for line in parsed.lines],
        )

        job.complete(result={"report_id": report.id})
        return {"status": "success"}

    except Exception as e:
        job.fail(error=str(e))
        return {"status": "failed", "error": str(e)}

7.3. Вызов задач

# Асинхронно
from apps.parsers.tasks import parse_industrial_production

result = parse_industrial_production.delay()
print(result.id)  # ID задачи

# С параметрами
result = parse_inspections.delay(year=2025, month=10, is_federal_law_248=False)

# Синхронно (для тестов)
from apps.parsers.tasks import parse_manufactures
result = parse_manufactures.apply(kwargs={"proxies": [...]})

8. КЛИЕНТЫ И ПАРСЕРЫ

8.1. Структура клиентов

apps/parsers/clients/
├── minpromtorg/
│   ├── __init__.py
│   ├── base.py           # Базовый класс
│   ├── industrial_production.py
│   └── manufactures.py
├── proverki/
│   ├── __init__.py
│   └── proverki_client.py
├── zakupki/
│   └── zakupki_client.py
└── fns/
    ├── __init__.py
    ├── parser.py         # FNSExcelParser
    └── directory_scanner.py

8.2. Клиент Минпромторга

class IndustrialProductionClient:
    """Клиент для API Минпромторга (сертификаты)."""

    BASE_URL = "https://minpromtorg.gov.ru/api"

    def __init__(
        self,
        proxies: list[str] | None = None,
        http_adapter: BaseAdapter | None = None,
    ):
        self.session = requests.Session()
        if proxies:
            self.session.proxies = {"http": proxies[0], "https": proxies[0]}
        if http_adapter:
            self.session.mount("http://", http_adapter)

    def fetch_certificates(self) -> list[dict]:
        """Получить список сертификатов."""
        response = self.session.get(f"{self.BASE_URL}/certificates")
        response.raise_for_status()
        return response.json()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.session.close()

8.3. Клиент proverki.gov.ru

class ProverkiClient:
    """Клиент для proverki.gov.ru с поддержкой Playwright."""

    def __init__(
        self,
        proxies: list[str] | None = None,
        use_playwright: bool = True,
    ):
        self.proxies = proxies
        self.use_playwright = use_playwright
        self.browser = None
        self.page = None

    def _init_browser(self):
        """Инициализация Playwright."""
        from playwright.sync_api import sync_playwright

        playwright = sync_playwright().start()
        self.browser = playwright.chromium.launch(headless=True)
        self.page = self.browser.new_page()

    def fetch_inspections(
        self,
        year: int | None = None,
        month: int | None = None,
        file_url: str | None = None,
        progress_callback: Callable | None = None,
    ) -> list[dict]:
        """Получить данные о проверках."""
        if self.use_playwright:
            self._init_browser()
            return self._fetch_with_playwright(year, month, file_url, progress_callback)
        else:
            return self._fetch_with_requests(year, month, file_url)

    def _fetch_with_playwright(...) -> list[dict]:
        """Парсинг через Playwright (JS-rendering)."""
        self.page.goto("https://proverki.gov.ru")
        # ... навигация и парсинг
        return inspections

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.browser:
            self.browser.close()

8.4. Парсер ФНС

class FNSExcelParser:
    """Парсер Excel-файлов ФНС."""

    @staticmethod
    def parse_file(file_path: Path) -> ParsedData:
        """Разобрать Excel-файл."""
        import pandas as pd

        df = pd.read_excel(file_path, sheet_name=0)

        # Извлечение данных
        external_id = df.iloc[0, 0]  # Пример
        ogrn = df.iloc[1, 0]

        lines = []
        for _, row in df.iterrows():
            lines.append(FNSLine(
                period=row['period'],
                revenue=row['revenue'],
                # ...
            ))

        return ParsedData(
            external_id=external_id,
            ogrn=ogrn,
            lines=lines,
        )

9. API И ПРЕДСТАВИТЕЛИ

9.1. Сериализаторы

# apps/parsers/serializers.py

class IndustrialCertificateSerializer(serializers.ModelSerializer):
    class Meta:
        model = IndustrialCertificateRecord
        fields = [
            'id', 'certificate_number', 'issue_date', 'expiry_date',
            'organisation_name', 'inn', 'ogrn', 'created_at',
        ]
        read_only_fields = ['id', 'created_at']


class ParserLoadLogSerializer(serializers.ModelSerializer):
    class Meta:
        model = ParserLoadLog
        fields = [
            'id', 'batch_id', 'source', 'records_count',
            'status', 'error_message', 'created_at',
        ]

9.2. Классы представлений

# apps/parsers/views.py

from rest_framework import viewsets, permissions
from apps.parsers.models import IndustrialCertificateRecord
from apps.parsers.serializers import IndustrialCertificateSerializer

class IndustrialCertificateViewSet(viewsets.ReadOnlyModelViewSet):
    """Только чтение для сертификатов."""
    queryset = IndustrialCertificateRecord.objects.all()
    serializer_class = IndustrialCertificateSerializer
    permission_classes = [permissions.IsAuthenticated]
    filter_backends = [filters.SearchFilter, filters.OrderingFilter]
    search_fields = ['organisation_name', 'certificate_number']
    ordering_fields = ['created_at', 'issue_date']

9.3. Маршруты (URLs)

# apps/parsers/urls.py

from rest_framework.routers import DefaultRouter
from apps.parsers.views import IndustrialCertificateViewSet

router = DefaultRouter()
router.register('certificates', IndustrialCertificateViewSet, basename='certificate')

urlpatterns = router.urls
# config/api_v1_urls.py

from django.urls import path, include

urlpatterns = [
    path('parsers/', include('apps.parsers.urls')),
    # ...
]

10. ТЕСТИРОВАНИЕ

10.1. Конфигурация (pytest)

# tests/conftest.py

import pytest
from django.conf import settings

@pytest.fixture(autouse=True)
def enable_db_access_for_all_tests(db):
    pass

@pytest.fixture
def api_client():
    from rest_framework.test import APIClient
    return APIClient()

@pytest.fixture
def user():
    from apps.user.models import User
    return User.objects.create_user(email='test@test.com', password='test')

10.2. Пример теста

# tests/apps/parsers/test_services.py

import pytest
from model_bakery import baker
from apps.parsers.services import IndustrialCertificateService
from apps.parsers.models import IndustrialCertificateRecord

@pytest.mark.django_db
class TestIndustrialCertificateService:

    def test_save_certificates(self):
        certificates = [
            {
                'certificate_number': 'CERT-001',
                'issue_date': '2025-01-01',
                'organisation_name': 'ООО Ромашка',
                'inn': '1234567890',
                'ogrn': '1234567890123',
            }
        ]

        saved = IndustrialCertificateService.save_certificates(certificates, batch_id=1)

        assert saved == 1
        assert IndustrialCertificateRecord.objects.count() == 1

    def test_save_certificates_duplicate(self):
        # Создать дубликат
        baker.make(
            IndustrialCertificateRecord,
            certificate_number='CERT-001'
        )

        certificates = [
            {
                'certificate_number': 'CERT-001',
                'issue_date': '2025-01-01',
                'organisation_name': 'ООО Ромашка',
                'inn': '1234567890',
                'ogrn': '1234567890123',
            }
        ]

        saved = IndustrialCertificateService.save_certificates(certificates, batch_id=2)

        assert saved == 0  # Не сохранён (дубликат)
        assert IndustrialCertificateRecord.objects.count() == 1

10.3. Тесты задач Celery

# tests/apps/parsers/test_tasks.py

import pytest
from apps.parsers.tasks import parse_industrial_production

@pytest.mark.django_db
class TestParseTasks:

    def test_parse_industrial_production(self, mocker):
        # Мок клиента
        mock_client = mocker.patch(
            'apps.parsers.tasks.IndustrialProductionClient'
        )
        mock_client.return_value.__enter__.return_value.fetch_certificates.return_value = []

        result = parse_industrial_production.delay()

        assert result.get()['status'] == 'success'

10.4. Команды тестирования

# Все тесты
make test

# С покрытием
make test-cov

# Только быстрые (без медленных)
make test-fast TARGET="--fast"

# Параллельно
make test-parallel

# Конкретный модуль
python -m pytest tests/apps/parsers/test_services.py -v

11. РАЗРАБОТКА И ОТЛАДКА

11.1. Локальный запуск

# Миграции
cd src
python manage.py makemigrations
python manage.py migrate

# Создание суперпользователя
python manage.py createsuperuser

# Запуск сервера разработки
python manage.py runserver

# Запуск Celery worker (в отдельном терминале)
celery -A config worker --loglevel=info

# Запуск Celery beat
celery -A config beat --loglevel=info

11.2. Отладка Celery

# Worker с отладкой
celery -A config worker --loglevel=debug --pool=solo

# Проверка очереди
celery -A config inspect active
celery -A config inspect registered

# Очистка очереди
celery -A config purge

11.3. Django Debug Toolbar

# Установка (уже в requirements-dev.txt)
uv pip install django-debug-toolbar

# Добавить в settings/dev.py:
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']

11.4. Логирование

# config/settings/dev.py

LOGGING = {
    'version': 1,
    'handlers': {
        'console': {'class': 'logging.StreamHandler'},
        'file': {
            'class': 'logging.FileHandler',
            'filename': 'logs/debug.log',
        },
    },
    'loggers': {
        'apps.parsers': {
            'handlers': ['console', 'file'],
            'level': 'DEBUG',
        },
    },
}

12. РАСШИРЕНИЕ ФУНКЦИОНАЛЬНОСТИ

12.1. Добавление нового парсера

  1. Создать модель:
# apps/parsers/models.py

class NewSourceRecord(TimestampMixin, models.Model):
    load_batch = models.PositiveIntegerField(db_index=True)
    # ... поля

    class Meta:
        db_table = "parsers_new_source"
        constraints = [
            models.UniqueConstraint(
                fields=["unique_field"],
                name="unique_new_source",
            ),
        ]
  1. Создать сервис:
# apps/parsers/services.py

class NewSourceService:
    @classmethod
    def save_records(cls, records: list[dict], batch_id: int) -> int:
        saved = 0
        for data in records:
            obj, created = NewSourceRecord.objects.update_or_create(
                unique_field=data['unique_field'],
                defaults={**data, 'load_batch': batch_id},
            )
            if created:
                saved += 1
        return saved
  1. Создать клиента:
# apps/parsers/clients/new_source.py

class NewSourceClient:
    def __init__(self, proxies=None):
        self.session = requests.Session()

    def fetch_data(self) -> list[dict]:
        response = self.session.get("https://api.example.com/data")
        return response.json()
  1. Создать задачу Celery:
# apps/parsers/tasks.py

@shared_task(bind=True)
def parse_new_source(self, proxies=None):
    source = ParserLoadLog.Source.NEW_SOURCE
    load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(source)

    job = BackgroundJobService.create_job(
        task_id=self.request.id,
        task_name="parse_new_source",
        meta={"source": source},
    )
    job.mark_started()

    try:
        with NewSourceClient(proxies=proxies) as client:
            records = client.fetch_data()

        saved = NewSourceService.save_records(records, batch_id)
        ParserLoadLogService.update(load_log, status="success", records_count=saved)
        job.complete(result={"saved": saved})
        return {"status": "success"}
    except Exception as e:
        ParserLoadLogService.mark_failed(load_log, str(e))
        job.fail(error=str(e))
        return {"status": "failed"}
  1. Добавить в admin:
# apps/parsers/admin.py

@admin.register(NewSourceRecord)
class NewSourceRecordAdmin(admin.ModelAdmin):
    list_display = ['id', 'unique_field', 'load_batch', 'created_at']
    list_filter = ['created_at']
    search_fields = ['unique_field']

12.2. Добавление периодической задачи

# config/celery.py

app.conf.beat_schedule = {
    # ... существующие
    "parse-new-source-hourly": {
        "task": "apps.parsers.tasks.parse_new_source",
        "schedule": 3600.0,  # Каждый час
    },
}

12.3. Миграции

cd src
python manage.py makemigrations parsers
python manage.py migrate

ПРИЛОЖЕНИЕ А. ПОЛЕЗНЫЕ КОМАНДЫ

# Создание миграций
python manage.py makemigrations
python manage.py migrate

# Django shell
python manage.py shell

# Проверка кода
ruff check src/
black --check src/
mypy src/

# Форматирование
black src/
isort src/

# Безопасность
bandit -r src/

# Тесты
pytest tests/ -v
pytest tests/ --cov=src/

ПРИЛОЖЕНИЕ Б. СТРУКТУРА БД

-- Основные таблицы

parsers_load_log              -- Логи загрузок
parsers_industrial_certificate -- Сертификаты
parsers_manufacturer_record   -- Производители
parsers_inspection_record     -- Проверки
parsers_financial_report      -- Отчёты ФНС
core_backgroundjob            -- Задачи
auth_user                     -- Пользователи

Документ составлен на основе версии кода от 2026-01-21