# РУКОВОДСТВО СИСТЕМНОГО ПРОГРАММИСТА ## Система ETL MOSTOVIK Версия документа: 1.0 Дата: 2026-01-21 --- ## СОДЕРЖАНИЕ 1. [Общие сведения](#1-общие-сведения) 2. [Структура проекта](#2-структура-проекта) 3. [Технологический стек](#3-технологический-стек) 4. [Конфигурация и зависимости](#4-конфигурация-и-зависимости) 5. [Модели данных](#5-модели-данных) 6. [Сервисы и бизнес-логика](#6-сервисы-и-бизнес-логика) 7. [Задачи Celery](#7-задачи-celery) 8. [Клиенты и парсеры](#8-клиенты-и-парсеры) 9. [API и представители](#9-api-и-представители) 10. [Тестирование](#10-тестирование) 11. [Разработка и отладка](#11-разработка-и-отладка) 12. [Расширение функциональности](#12-расширение-функциональности) --- ## 1. ОБЩИЕ СВЕДЕНИЯ ### 1.1. Назначение документа Руководство содержит техническую информацию для разработчиков и системных программистов, работающих с системой MOSTOVIK. ### 1.2. Описание системы MOSTOVIK — ETL-система для сбора, обработки и хранения данных из государственных источников: - **Минпромторг** (minpromtorg.gov.ru) — сертификаты промышленного производства, реестр производителей - **Проверки.гов.ру** (proverki.gov.ru) — данные о проверках (ФЗ-294, ФЗ-248) - **ФНС** — бухгалтерская отчётность ### 1.3. Основные возможности - Автоматический парсинг данных через Celery - Отслеживание прогресса задач (BackgroundJob) - Логирование всех операций (ParserLoadLog) - Повторные попытки при ошибках - Потоковая обработка больших файлов - Дедупликация данных --- ## 2. СТРУКТУРА ПРОЕКТА ``` mostovik-backend/ ├── src/ # Исходный код Django │ ├── config/ # Конфигурация Django │ │ ├── settings/ # Настройки (base, dev, prod, test) │ │ │ ├── base.py # Базовая конфигурация │ │ │ ├── dev.py # Разработка │ │ │ ├── production.py # Production │ │ │ └── test.py # Тесты │ │ ├── celery.py # Конфигурация Celery │ │ ├── urls.py # Корневые URL │ │ ├── api_v1_urls.py # Маршруты API │ │ ├── wsgi.py # Точка входа WSGI │ │ └── asgi.py # Точка входа ASGI │ ├── apps/ # Приложения Django │ │ ├── core/ # Базовые компоненты │ │ │ ├── models.py # Базовые модели (TimestampMixin) │ │ │ ├── services.py # BackgroundJobService │ │ │ ├── tasks.py # Общие задачи │ │ │ ├── views.py # Базовые классы представлений │ │ │ ├── serializers.py │ │ │ ├── filters.py │ │ │ ├── pagination.py │ │ │ ├── permissions.py │ │ │ └── middleware.py │ │ ├── parsers/ # Парсеры данных │ │ │ ├── models.py # Модели данных │ │ │ ├── services.py # Бизнес-логика │ │ │ ├── tasks.py # Задачи Celery │ │ │ ├── clients/ # API-клиенты │ │ │ │ ├── minpromtorg/ │ │ │ │ ├── proverki/ │ │ │ │ ├── zakupki/ │ │ │ │ └── fns/ │ │ │ ├── admin.py # Django admin │ │ │ ├── urls.py # Маршруты │ │ │ ├── views.py # Классы представлений │ │ │ └── serializers.py │ │ └── user/ # Пользователи │ │ ├── models.py │ │ ├── admin.py │ │ └── ... │ └── manage.py # Утилита управления Django ├── tests/ # Тесты │ ├── apps/ │ │ ├── user/ │ │ └── parsers/ │ ├── conftest.py # Конфигурация pytest │ └── factories.py # Фабрики Factory Boy ├── deploy/ # Развёртывание │ ├── scripts/ # Скрипты │ ├── systemd/ # Файлы systemd │ └── apache/ # Конфигурация Apache ├── docker/ # Конфигурация Docker ├── scripts/ # Вспомогательные скрипты ├── logs/ # Логи ├── .env.example # Пример окружения ├── pyproject.toml # Зависимости и конфигурация ├── requirements.txt # Производственные зависимости ├── requirements-dev.txt # Зависимости для разработки └── Makefile # Команды разработки ``` --- ## 3. ТЕХНОЛОГИЧЕСКИЙ СТЕК ### 3.1. Основные технологии | Компонент | Технология | Версия | |-----------|-------------|---------| | Язык | Python | 3.11 | | Фреймворк | Django | 3.2.25 | | API | Django REST Framework | 3.14.0 | | БД | PostgreSQL | 15.10 | | Кеш | Redis | 7.x | | Очереди | Celery | 5.3.6 | | Веб-сервер | Gunicorn + Apache | 21.2.0 / 2.4.57 | ### 3.2. Библиотеки для парсинга | Назначение | Библиотека | Версия | |-------------|-----------|---------| | Автоматизация браузера | Playwright | 1.57.0+ | | Web scraping | Scrapy | 2.11.2 | | Browser automation | Selenium | 4.17.2 | | HTML parsing | BeautifulSoup4 | 4.12.3 | | HTTP-запросы | requests | 2.31.0 | ### 3.3. Обработка данных | Назначение | Библиотека | Версия | |-------------|-----------|---------| | Таблицы | pandas | 2.0.3 | | Excel | openpyxl | 3.1.5+ | | Word | python-docx | 1.2.0+ | | CSV | built-in | - | ### 3.4. Инструменты разработки | Назначение | Инструмент | Версия | |-------------|-----------|---------| | Тестирование | pytest | 7.4.4 | | Покрытие | coverage | 7.4.0 | | Linting | ruff | 0.1.14 | | Форматирование | black | 23.12.1 | | Сортировка импортов | isort | 5.13.2 | | Проверка типов | mypy | 1.8.0 | | Безопасность | bandit | 1.7.5 | --- ## 4. КОНФИГУРАЦИЯ И ЗАВИСИМОСТИ ### 4.1. Установка зависимостей ```bash # Через uv (рекомендуется) uv pip install -e ".[dev]" # Или через requirements uv pip install -r requirements.txt uv pip install -r requirements-dev.txt ``` ### 4.2. Конфигурация окружения ```bash cp .env.example .env ``` **Ключевые переменные для разработки:** ```ini # Django DJANGO_SETTINGS_MODULE=config.settings.dev DEBUG=True SECRET_KEY=django-insecure-dev-key # База данных POSTGRES_DB=mostovik POSTGRES_USER=postgres POSTGRES_PASSWORD=postgres POSTGRES_HOST=127.0.0.1 POSTGRES_PORT=5432 # Redis REDIS_URL=redis://127.0.0.1:6379/0 CELERY_BROKER_URL=redis://127.0.0.1:6379/0 # API парсеров ZAKUPKI_TOKEN= CHECKO_API_KEY= ``` ### 4.3. Конфигурация инструментов (pyproject.toml) ```toml [tool.pytest.ini_options] DJANGO_SETTINGS_MODULE = "config.settings.test" testpaths = ["tests"] addopts = ["--verbose", "--tb=short", "--reuse-db"] [tool.ruff] line-length = 88 target-version = "py311" [tool.black] line-length = 88 target-version = ['py311'] [tool.mypy] python_version = "3.11" check_untyped_defs = true plugins = ["mypy_django_plugin.main"] ``` --- ## 5. МОДЕЛИ ДАННЫХ ### 5.1. Базовые модели (apps.core) **TimestampMixin** — миксин для автоматического добавления полей времени: ```python class TimestampMixin(models.Model): created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) class Meta: abstract = True ``` ### 5.2. Модели парсеров (apps.parsers) #### ParserLoadLog Лог загрузок парсеров: ```python class ParserLoadLog(TimestampMixin, models.Model): class Source(models.TextChoices): INDUSTRIAL = "industrial", "Промышленное производство" MANUFACTURES = "manufactures", "Реестр производителей" INSPECTIONS = "inspections", "Единый реестр проверок" PROCUREMENTS = "procurements", "Госзакупки" FNS_REPORTS = "fns_reports", "Бухгалтерская отчётность ФНС" batch_id = models.PositiveIntegerField(db_index=True) source = models.CharField(max_length=50, choices=Source.choices, db_index=True) records_count = models.PositiveIntegerField(default=0) status = models.CharField(max_length=20, default="success") error_message = models.TextField(blank=True) class Meta: db_table = "parsers_load_log" constraints = [ models.UniqueConstraint( fields=["source", "batch_id"], name="unique_load_batch_per_source", ), ] ``` #### IndustrialCertificateRecord Сертификат промышленного производства: ```python class IndustrialCertificateRecord(TimestampMixin, models.Model): load_batch = models.PositiveIntegerField(db_index=True) issue_date = models.CharField(max_length=15, blank=True) certificate_number = models.CharField(max_length=100, db_index=True) expiry_date = models.CharField(max_length=15, blank=True) certificate_file_url = models.TextField(blank=True) organisation_name = models.TextField() inn = models.CharField(max_length=20, db_index=True) ogrn = models.CharField(max_length=20, db_index=True) class Meta: db_table = "parsers_industrial_certificate" constraints = [ models.UniqueConstraint( fields=["certificate_number"], name="unique_certificate_number", ), ] ``` #### ManufacturerRecord Производитель из реестра Минпромторга: ```python class ManufacturerRecord(TimestampMixin, models.Model): load_batch = models.PositiveIntegerField(db_index=True) inn = models.CharField(max_length=20, db_index=True) ogrn = models.CharField(max_length=20, db_index=True) manufacturer_name = models.TextField() # ... другие поля ``` #### InspectionRecord Запись о проверке (proverki.gov.ru): ```python class InspectionRecord(TimestampMixin, models.Model): class InspectionType(models.TextChoices): FZ294 = "294", "ФЗ-294 (традиционные)" FZ248 = "248", "ФЗ-248 (новые)" load_batch = models.PositiveIntegerField(db_index=True) inspection_id = models.CharField(max_length=100, unique=True) inspection_type = models.CharField(max_length=3, choices=InspectionType.choices) data_year = models.PositiveIntegerField() data_month = models.PositiveIntegerField() # ... другие поля ``` #### FinancialReport Бухгалтерский отчёт (ФНС): ```python class FinancialReport(TimestampMixin, models.Model): class SourceType(models.TextChoices): FILE_WATCH = "file_watch", "Мониторинг папок" MANUAL = "manual", "Ручная загрузка" external_id = models.CharField(max_length=100, unique=True) ogrn = models.CharField(max_length=20, db_index=True) file_name = models.CharField(max_length=255) file_hash = models.CharField(max_length=64, db_index=True) source = models.CharField(max_length=20, choices=SourceType.choices) batch_id = models.PositiveIntegerField(db_index=True) ``` ### 5.3. Модели ядра (apps.core) #### BackgroundJob Отслеживание прогресса задач: ```python class BackgroundJob(TimestampMixin, models.Model): class Status(models.TextChoices): PENDING = "pending", "Ожидание" RUNNING = "running", "Выполняется" COMPLETED = "completed", "Завершено" FAILED = "failed", "Ошибка" task_id = models.CharField(max_length=100, unique=True) task_name = models.CharField(max_length=255) status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING) progress = models.PositiveIntegerField(default=0) # 0-100 message = models.TextField(blank=True) meta = models.JSONField(default=dict, blank=True) result = models.JSONField(default=dict, blank=True) error = models.TextField(blank=True) started_at = models.DateTimeField(null=True, blank=True) completed_at = models.DateTimeField(null=True, blank=True) ``` --- ## 6. СЕРВИСЫ И БИЗНЕС-ЛОГИКА ### 6.1. BackgroundJobService (apps.core) Управление фоновыми задачами: ```python class BackgroundJobService: @classmethod def create_job( cls, task_id: str, task_name: str, meta: dict | None = None, ) -> BackgroundJob: """Создать новую задачу.""" return BackgroundJob.objects.create( task_id=task_id, task_name=task_name, meta=meta or {}, ) @classmethod def get_by_task_id(cls, task_id: str) -> BackgroundJob | None: """Получить задачу по ID.""" return BackgroundJob.objects.filter(task_id=task_id).first() ``` **Методы:** | Метод | Описание | |-------|----------| | `create_job()` | Создание новой задачи | | `get_by_task_id()` | Получение задачи по ID | | `mark_started()` | Установка статуса "running" | | `update_progress()` | Обновление прогресса (%) | | `complete()` | Завершение задачи | | `fail()` | Пометка как ошибочной | ### 6.2. ParserLoadLogService (apps.parsers) Логирование загрузок: ```python class ParserLoadLogService: @classmethod def create_load_log_with_next_batch_id( cls, source: ParserLoadLog.Source, status: str = "in_progress", ) -> tuple[ParserLoadLog, int]: """Создать лог загрузки со следующим batch_id.""" last_log = ParserLoadLog.objects.filter(source=source).order_by('-batch_id').first() next_batch_id = (last_log.batch_id + 1) if last_log else 1 log = ParserLoadLog.objects.create( batch_id=next_batch_id, source=source, status=status, ) return log, next_batch_id @classmethod def update( cls, log: ParserLoadLog, status: str | None = None, records_count: int | None = None, error_message: str | None = None, ) -> None: """Обновить лог загрузки.""" # ... @classmethod def mark_failed(cls, log: ParserLoadLog, error: str) -> None: """Пометить как неудачную.""" # ... ``` ### 6.3. Сервисы парсеров #### IndustrialCertificateService ```python class IndustrialCertificateService: @classmethod def save_certificates( cls, certificates: list[dict], batch_id: int, ) -> int: """Сохранить сертификаты с дедупликацией.""" saved = 0 for cert_data in certificates: obj, created = IndustrialCertificateRecord.objects.update_or_create( certificate_number=cert_data['certificate_number'], defaults={**cert_data, 'load_batch': batch_id}, ) if created: saved += 1 return saved ``` #### InspectionService ```python class InspectionService: @classmethod def get_last_loaded_period( cls, is_federal_law_248: bool, ) -> tuple[int | None, int | None]: """Получить последний загруженный период.""" last_log = InspectionRecord.objects.filter( is_federal_law_248=is_federal_law_248 ).order_by('-data_year', '-data_month').first() if last_log: return last_log.data_year, last_log.data_month return None, None @classmethod def save_inspections( cls, inspections: list[dict], batch_id: int, is_federal_law_248: bool = False, data_year: int | None = None, data_month: int | None = None, ) -> int: """Сохранить данные о проверках.""" # ... ``` #### FNSReportService ```python class FNSReportService: @classmethod def exists_by_hash(cls, file_hash: str) -> bool: """Проверить существование файла по хешу.""" return FinancialReport.objects.filter(file_hash=file_hash).exists() @classmethod def save_report( cls, external_id: str, ogrn: str, file_name: str, file_hash: str, source: FinancialReport.SourceType, batch_id: int, lines_data: list[dict], ) -> FinancialReport: """Сохранить отчёт со строками.""" # ... ``` --- ## 7. ЗАДАЧИ CELERY ### 7.1. Конфигурация (config/celery.py) ```python app = Celery("project") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() app.conf.beat_schedule = { "parse-industrial-production-daily": { "task": "apps.parsers.tasks.parse_industrial_production", "schedule": 86400.0, # Каждые 24 часа }, "parse-manufactures-daily": { "task": "apps.parsers.tasks.parse_manufactures", "schedule": 86400.0, }, "scan-fns-directory": { "task": "apps.parsers.tasks.scan_fns_directory", "schedule": 300.0, # Каждые 5 минут }, } ``` ### 7.2. Основные задачи #### parse_industrial_production ```python @shared_task(bind=True) def parse_industrial_production( self, proxies: list[str] | None = None, client_adapter: BaseAdapter | None = None, ) -> dict: """Парсинг сертификатов Минпромторга.""" source = ParserLoadLog.Source.INDUSTRIAL load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id( source=source, status="in_progress" ) task_id = self.request.id or str(uuid.uuid4()) # Создание BackgroundJob job = BackgroundJobService.create_job( task_id=task_id, task_name="apps.parsers.tasks.parse_industrial_production", meta={"source": source, "batch_id": batch_id}, ) job.mark_started() try: # Парсинг if proxies is None: proxies = ProxyService.get_active_proxies_or_none() with IndustrialProductionClient(proxies=proxies) as client: certificates = client.fetch_certificates() # Сохранение saved_count = IndustrialCertificateService.save_certificates( certificates, batch_id=batch_id ) ParserLoadLogService.update(load_log, status="success", records_count=saved_count) job.complete(result={"batch_id": batch_id, "saved": saved_count}) return {"batch_id": batch_id, "saved": saved_count, "status": "success"} except Exception as e: ParserLoadLogService.mark_failed(load_log, str(e)) job.fail(error=str(e)) return {"status": "failed", "error": str(e)} ``` #### sync_inspections Автоматическая синхронизация проверок: ```python @shared_task(bind=True) def sync_inspections( self, proxies: list[str] | None = None, use_playwright: bool | None = None, current_year: int | None = None, current_month: int | None = None, ) -> dict: """Синхронизация данных о проверках. Логика: 1. Проверяет последнюю загруженную дату в БД 2. Если данных нет — начинает с 01.01.2025 3. Загружает месяц за месяцем до текущего 4. Загружает оба типа проверок (ФЗ-294 и ФЗ-248) 5. При отсутствии данных (2 пустых месяца) — прекращает """ # ... ``` #### process_fns_file ```python @shared_task(bind=True) def process_fns_file(self, file_path: str | Path) -> dict: """Обработка файла ФНС.""" task_id = self.request.id file_path = Path(file_path) # Создание BackgroundJob job = BackgroundJobService.create_job( task_id=task_id, task_name="apps.parsers.tasks.process_fns_file", meta={"file": file_path.name}, ) job.mark_started() try: # Проверка дубликата file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest() if FNSReportService.exists_by_hash(file_hash): job.complete(result={"status": "skipped", "reason": "duplicate"}) return {"status": "skipped"} # Парсинг Excel parsed = FNSExcelParser.parse_file(file_path) # Сохранение report = FNSReportService.save_report( external_id=parsed.external_id, ogrn=parsed.ogrn, file_name=file_path.name, file_hash=file_hash, source=FinancialReport.SourceType.FILE_WATCH, lines_data=[asdict(line) for line in parsed.lines], ) job.complete(result={"report_id": report.id}) return {"status": "success"} except Exception as e: job.fail(error=str(e)) return {"status": "failed", "error": str(e)} ``` ### 7.3. Вызов задач ```python # Асинхронно from apps.parsers.tasks import parse_industrial_production result = parse_industrial_production.delay() print(result.id) # ID задачи # С параметрами result = parse_inspections.delay(year=2025, month=10, is_federal_law_248=False) # Синхронно (для тестов) from apps.parsers.tasks import parse_manufactures result = parse_manufactures.apply(kwargs={"proxies": [...]}) ``` --- ## 8. КЛИЕНТЫ И ПАРСЕРЫ ### 8.1. Структура клиентов ``` apps/parsers/clients/ ├── minpromtorg/ │ ├── __init__.py │ ├── base.py # Базовый класс │ ├── industrial_production.py │ └── manufactures.py ├── proverki/ │ ├── __init__.py │ └── proverki_client.py ├── zakupki/ │ └── zakupki_client.py └── fns/ ├── __init__.py ├── parser.py # FNSExcelParser └── directory_scanner.py ``` ### 8.2. Клиент Минпромторга ```python class IndustrialProductionClient: """Клиент для API Минпромторга (сертификаты).""" BASE_URL = "https://minpromtorg.gov.ru/api" def __init__( self, proxies: list[str] | None = None, http_adapter: BaseAdapter | None = None, ): self.session = requests.Session() if proxies: self.session.proxies = {"http": proxies[0], "https": proxies[0]} if http_adapter: self.session.mount("http://", http_adapter) def fetch_certificates(self) -> list[dict]: """Получить список сертификатов.""" response = self.session.get(f"{self.BASE_URL}/certificates") response.raise_for_status() return response.json() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.session.close() ``` ### 8.3. Клиент proverki.gov.ru ```python class ProverkiClient: """Клиент для proverki.gov.ru с поддержкой Playwright.""" def __init__( self, proxies: list[str] | None = None, use_playwright: bool = True, ): self.proxies = proxies self.use_playwright = use_playwright self.browser = None self.page = None def _init_browser(self): """Инициализация Playwright.""" from playwright.sync_api import sync_playwright playwright = sync_playwright().start() self.browser = playwright.chromium.launch(headless=True) self.page = self.browser.new_page() def fetch_inspections( self, year: int | None = None, month: int | None = None, file_url: str | None = None, progress_callback: Callable | None = None, ) -> list[dict]: """Получить данные о проверках.""" if self.use_playwright: self._init_browser() return self._fetch_with_playwright(year, month, file_url, progress_callback) else: return self._fetch_with_requests(year, month, file_url) def _fetch_with_playwright(...) -> list[dict]: """Парсинг через Playwright (JS-rendering).""" self.page.goto("https://proverki.gov.ru") # ... навигация и парсинг return inspections def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if self.browser: self.browser.close() ``` ### 8.4. Парсер ФНС ```python class FNSExcelParser: """Парсер Excel-файлов ФНС.""" @staticmethod def parse_file(file_path: Path) -> ParsedData: """Разобрать Excel-файл.""" import pandas as pd df = pd.read_excel(file_path, sheet_name=0) # Извлечение данных external_id = df.iloc[0, 0] # Пример ogrn = df.iloc[1, 0] lines = [] for _, row in df.iterrows(): lines.append(FNSLine( period=row['period'], revenue=row['revenue'], # ... )) return ParsedData( external_id=external_id, ogrn=ogrn, lines=lines, ) ``` --- ## 9. API И ПРЕДСТАВИТЕЛИ ### 9.1. Сериализаторы ```python # apps/parsers/serializers.py class IndustrialCertificateSerializer(serializers.ModelSerializer): class Meta: model = IndustrialCertificateRecord fields = [ 'id', 'certificate_number', 'issue_date', 'expiry_date', 'organisation_name', 'inn', 'ogrn', 'created_at', ] read_only_fields = ['id', 'created_at'] class ParserLoadLogSerializer(serializers.ModelSerializer): class Meta: model = ParserLoadLog fields = [ 'id', 'batch_id', 'source', 'records_count', 'status', 'error_message', 'created_at', ] ``` ### 9.2. Классы представлений ```python # apps/parsers/views.py from rest_framework import viewsets, permissions from apps.parsers.models import IndustrialCertificateRecord from apps.parsers.serializers import IndustrialCertificateSerializer class IndustrialCertificateViewSet(viewsets.ReadOnlyModelViewSet): """Только чтение для сертификатов.""" queryset = IndustrialCertificateRecord.objects.all() serializer_class = IndustrialCertificateSerializer permission_classes = [permissions.IsAuthenticated] filter_backends = [filters.SearchFilter, filters.OrderingFilter] search_fields = ['organisation_name', 'certificate_number'] ordering_fields = ['created_at', 'issue_date'] ``` ### 9.3. Маршруты (URLs) ```python # apps/parsers/urls.py from rest_framework.routers import DefaultRouter from apps.parsers.views import IndustrialCertificateViewSet router = DefaultRouter() router.register('certificates', IndustrialCertificateViewSet, basename='certificate') urlpatterns = router.urls ``` ```python # config/api_v1_urls.py from django.urls import path, include urlpatterns = [ path('parsers/', include('apps.parsers.urls')), # ... ] ``` --- ## 10. ТЕСТИРОВАНИЕ ### 10.1. Конфигурация (pytest) ```python # tests/conftest.py import pytest from django.conf import settings @pytest.fixture(autouse=True) def enable_db_access_for_all_tests(db): pass @pytest.fixture def api_client(): from rest_framework.test import APIClient return APIClient() @pytest.fixture def user(): from apps.user.models import User return User.objects.create_user(email='test@test.com', password='test') ``` ### 10.2. Пример теста ```python # tests/apps/parsers/test_services.py import pytest from model_bakery import baker from apps.parsers.services import IndustrialCertificateService from apps.parsers.models import IndustrialCertificateRecord @pytest.mark.django_db class TestIndustrialCertificateService: def test_save_certificates(self): certificates = [ { 'certificate_number': 'CERT-001', 'issue_date': '2025-01-01', 'organisation_name': 'ООО Ромашка', 'inn': '1234567890', 'ogrn': '1234567890123', } ] saved = IndustrialCertificateService.save_certificates(certificates, batch_id=1) assert saved == 1 assert IndustrialCertificateRecord.objects.count() == 1 def test_save_certificates_duplicate(self): # Создать дубликат baker.make( IndustrialCertificateRecord, certificate_number='CERT-001' ) certificates = [ { 'certificate_number': 'CERT-001', 'issue_date': '2025-01-01', 'organisation_name': 'ООО Ромашка', 'inn': '1234567890', 'ogrn': '1234567890123', } ] saved = IndustrialCertificateService.save_certificates(certificates, batch_id=2) assert saved == 0 # Не сохранён (дубликат) assert IndustrialCertificateRecord.objects.count() == 1 ``` ### 10.3. Тесты задач Celery ```python # tests/apps/parsers/test_tasks.py import pytest from apps.parsers.tasks import parse_industrial_production @pytest.mark.django_db class TestParseTasks: def test_parse_industrial_production(self, mocker): # Мок клиента mock_client = mocker.patch( 'apps.parsers.tasks.IndustrialProductionClient' ) mock_client.return_value.__enter__.return_value.fetch_certificates.return_value = [] result = parse_industrial_production.delay() assert result.get()['status'] == 'success' ``` ### 10.4. Команды тестирования ```bash # Все тесты make test # С покрытием make test-cov # Только быстрые (без медленных) make test-fast TARGET="--fast" # Параллельно make test-parallel # Конкретный модуль python -m pytest tests/apps/parsers/test_services.py -v ``` --- ## 11. РАЗРАБОТКА И ОТЛАДКА ### 11.1. Локальный запуск ```bash # Миграции cd src python manage.py makemigrations python manage.py migrate # Создание суперпользователя python manage.py createsuperuser # Запуск сервера разработки python manage.py runserver # Запуск Celery worker (в отдельном терминале) celery -A config worker --loglevel=info # Запуск Celery beat celery -A config beat --loglevel=info ``` ### 11.2. Отладка Celery ```bash # Worker с отладкой celery -A config worker --loglevel=debug --pool=solo # Проверка очереди celery -A config inspect active celery -A config inspect registered # Очистка очереди celery -A config purge ``` ### 11.3. Django Debug Toolbar ```bash # Установка (уже в requirements-dev.txt) uv pip install django-debug-toolbar # Добавить в settings/dev.py: INSTALLED_APPS += ['debug_toolbar'] MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware'] ``` ### 11.4. Логирование ```python # config/settings/dev.py LOGGING = { 'version': 1, 'handlers': { 'console': {'class': 'logging.StreamHandler'}, 'file': { 'class': 'logging.FileHandler', 'filename': 'logs/debug.log', }, }, 'loggers': { 'apps.parsers': { 'handlers': ['console', 'file'], 'level': 'DEBUG', }, }, } ``` --- ## 12. РАСШИРЕНИЕ ФУНКЦИОНАЛЬНОСТИ ### 12.1. Добавление нового парсера 1. **Создать модель:** ```python # apps/parsers/models.py class NewSourceRecord(TimestampMixin, models.Model): load_batch = models.PositiveIntegerField(db_index=True) # ... поля class Meta: db_table = "parsers_new_source" constraints = [ models.UniqueConstraint( fields=["unique_field"], name="unique_new_source", ), ] ``` 2. **Создать сервис:** ```python # apps/parsers/services.py class NewSourceService: @classmethod def save_records(cls, records: list[dict], batch_id: int) -> int: saved = 0 for data in records: obj, created = NewSourceRecord.objects.update_or_create( unique_field=data['unique_field'], defaults={**data, 'load_batch': batch_id}, ) if created: saved += 1 return saved ``` 3. **Создать клиента:** ```python # apps/parsers/clients/new_source.py class NewSourceClient: def __init__(self, proxies=None): self.session = requests.Session() def fetch_data(self) -> list[dict]: response = self.session.get("https://api.example.com/data") return response.json() ``` 4. **Создать задачу Celery:** ```python # apps/parsers/tasks.py @shared_task(bind=True) def parse_new_source(self, proxies=None): source = ParserLoadLog.Source.NEW_SOURCE load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(source) job = BackgroundJobService.create_job( task_id=self.request.id, task_name="parse_new_source", meta={"source": source}, ) job.mark_started() try: with NewSourceClient(proxies=proxies) as client: records = client.fetch_data() saved = NewSourceService.save_records(records, batch_id) ParserLoadLogService.update(load_log, status="success", records_count=saved) job.complete(result={"saved": saved}) return {"status": "success"} except Exception as e: ParserLoadLogService.mark_failed(load_log, str(e)) job.fail(error=str(e)) return {"status": "failed"} ``` 5. **Добавить в admin:** ```python # apps/parsers/admin.py @admin.register(NewSourceRecord) class NewSourceRecordAdmin(admin.ModelAdmin): list_display = ['id', 'unique_field', 'load_batch', 'created_at'] list_filter = ['created_at'] search_fields = ['unique_field'] ``` ### 12.2. Добавление периодической задачи ```python # config/celery.py app.conf.beat_schedule = { # ... существующие "parse-new-source-hourly": { "task": "apps.parsers.tasks.parse_new_source", "schedule": 3600.0, # Каждый час }, } ``` ### 12.3. Миграции ```bash cd src python manage.py makemigrations parsers python manage.py migrate ``` --- ## ПРИЛОЖЕНИЕ А. ПОЛЕЗНЫЕ КОМАНДЫ ```bash # Создание миграций python manage.py makemigrations python manage.py migrate # Django shell python manage.py shell # Проверка кода ruff check src/ black --check src/ mypy src/ # Форматирование black src/ isort src/ # Безопасность bandit -r src/ # Тесты pytest tests/ -v pytest tests/ --cov=src/ ``` --- ## ПРИЛОЖЕНИЕ Б. СТРУКТУРА БД ```sql -- Основные таблицы parsers_load_log -- Логи загрузок parsers_industrial_certificate -- Сертификаты parsers_manufacturer_record -- Производители parsers_inspection_record -- Проверки parsers_financial_report -- Отчёты ФНС core_backgroundjob -- Задачи auth_user -- Пользователи ``` --- *Документ составлен на основе версии кода от 2026-01-21*