38 KiB
38 KiB
РУКОВОДСТВО СИСТЕМНОГО ПРОГРАММИСТА
Система ETL MOSTOVIK
Версия документа: 1.0
Дата: 2026-01-21
СОДЕРЖАНИЕ
- Общие сведения
- Структура проекта
- Технологический стек
- Конфигурация и зависимости
- Модели данных
- Сервисы и бизнес-логика
- Задачи Celery
- Клиенты и парсеры
- API и представители
- Тестирование
- Разработка и отладка
- Расширение функциональности
1. ОБЩИЕ СВЕДЕНИЯ
1.1. Назначение документа
Руководство содержит техническую информацию для разработчиков и системных программистов, работающих с системой MOSTOVIK.
1.2. Описание системы
MOSTOVIK — ETL-система для сбора, обработки и хранения данных из государственных источников:
- Минпромторг (minpromtorg.gov.ru) — сертификаты промышленного производства, реестр производителей
- Проверки.гов.ру (proverki.gov.ru) — данные о проверках (ФЗ-294, ФЗ-248)
- ФНС — бухгалтерская отчётность
1.3. Основные возможности
- Автоматический парсинг данных через Celery
- Отслеживание прогресса задач (BackgroundJob)
- Логирование всех операций (ParserLoadLog)
- Повторные попытки при ошибках
- Потоковая обработка больших файлов
- Дедупликация данных
2. СТРУКТУРА ПРОЕКТА
mostovik-backend/
├── src/ # Исходный код Django
│ ├── config/ # Конфигурация Django
│ │ ├── settings/ # Настройки (base, dev, prod, test)
│ │ │ ├── base.py # Базовая конфигурация
│ │ │ ├── dev.py # Разработка
│ │ │ ├── production.py # Production
│ │ │ └── test.py # Тесты
│ │ ├── celery.py # Конфигурация Celery
│ │ ├── urls.py # Корневые URL
│ │ ├── api_v1_urls.py # Маршруты API
│ │ ├── wsgi.py # Точка входа WSGI
│ │ └── asgi.py # Точка входа ASGI
│ ├── apps/ # Приложения Django
│ │ ├── core/ # Базовые компоненты
│ │ │ ├── models.py # Базовые модели (TimestampMixin)
│ │ │ ├── services.py # BackgroundJobService
│ │ │ ├── tasks.py # Общие задачи
│ │ │ ├── views.py # Базовые классы представлений
│ │ │ ├── serializers.py
│ │ │ ├── filters.py
│ │ │ ├── pagination.py
│ │ │ ├── permissions.py
│ │ │ └── middleware.py
│ │ ├── parsers/ # Парсеры данных
│ │ │ ├── models.py # Модели данных
│ │ │ ├── services.py # Бизнес-логика
│ │ │ ├── tasks.py # Задачи Celery
│ │ │ ├── clients/ # API-клиенты
│ │ │ │ ├── minpromtorg/
│ │ │ │ ├── proverki/
│ │ │ │ ├── zakupki/
│ │ │ │ └── fns/
│ │ │ ├── admin.py # Django admin
│ │ │ ├── urls.py # Маршруты
│ │ │ ├── views.py # Классы представлений
│ │ │ └── serializers.py
│ │ └── user/ # Пользователи
│ │ ├── models.py
│ │ ├── admin.py
│ │ └── ...
│ └── manage.py # Утилита управления Django
├── tests/ # Тесты
│ ├── apps/
│ │ ├── user/
│ │ └── parsers/
│ ├── conftest.py # Конфигурация pytest
│ └── factories.py # Фабрики Factory Boy
├── deploy/ # Развёртывание
│ ├── scripts/ # Скрипты
│ ├── systemd/ # Файлы systemd
│ └── apache/ # Конфигурация Apache
├── docker/ # Конфигурация Docker
├── scripts/ # Вспомогательные скрипты
├── logs/ # Логи
├── .env.example # Пример окружения
├── pyproject.toml # Зависимости и конфигурация
├── requirements.txt # Производственные зависимости
├── requirements-dev.txt # Зависимости для разработки
└── Makefile # Команды разработки
3. ТЕХНОЛОГИЧЕСКИЙ СТЕК
3.1. Основные технологии
| Компонент | Технология | Версия |
|---|---|---|
| Язык | Python | 3.11 |
| Фреймворк | Django | 3.2.25 |
| API | Django REST Framework | 3.14.0 |
| БД | PostgreSQL | 15.10 |
| Кеш | Redis | 7.x |
| Очереди | Celery | 5.3.6 |
| Веб-сервер | Gunicorn + Apache | 21.2.0 / 2.4.57 |
3.2. Библиотеки для парсинга
| Назначение | Библиотека | Версия |
|---|---|---|
| Автоматизация браузера | Playwright | 1.57.0+ |
| Web scraping | Scrapy | 2.11.2 |
| Browser automation | Selenium | 4.17.2 |
| HTML parsing | BeautifulSoup4 | 4.12.3 |
| HTTP-запросы | requests | 2.31.0 |
3.3. Обработка данных
| Назначение | Библиотека | Версия |
|---|---|---|
| Таблицы | pandas | 2.0.3 |
| Excel | openpyxl | 3.1.5+ |
| Word | python-docx | 1.2.0+ |
| CSV | built-in | - |
3.4. Инструменты разработки
| Назначение | Инструмент | Версия |
|---|---|---|
| Тестирование | pytest | 7.4.4 |
| Покрытие | coverage | 7.4.0 |
| Linting | ruff | 0.1.14 |
| Форматирование | black | 23.12.1 |
| Сортировка импортов | isort | 5.13.2 |
| Проверка типов | mypy | 1.8.0 |
| Безопасность | bandit | 1.7.5 |
4. КОНФИГУРАЦИЯ И ЗАВИСИМОСТИ
4.1. Установка зависимостей
# Через uv (рекомендуется)
uv pip install -e ".[dev]"
# Или через requirements
uv pip install -r requirements.txt
uv pip install -r requirements-dev.txt
4.2. Конфигурация окружения
cp .env.example .env
Ключевые переменные для разработки:
# Django
DJANGO_SETTINGS_MODULE=config.settings.dev
DEBUG=True
SECRET_KEY=django-insecure-dev-key
# База данных
POSTGRES_DB=mostovik
POSTGRES_USER=postgres
POSTGRES_PASSWORD=postgres
POSTGRES_HOST=127.0.0.1
POSTGRES_PORT=5432
# Redis
REDIS_URL=redis://127.0.0.1:6379/0
CELERY_BROKER_URL=redis://127.0.0.1:6379/0
# API парсеров
ZAKUPKI_TOKEN=<token>
CHECKO_API_KEY=<key>
4.3. Конфигурация инструментов (pyproject.toml)
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "config.settings.test"
testpaths = ["tests"]
addopts = ["--verbose", "--tb=short", "--reuse-db"]
[tool.ruff]
line-length = 88
target-version = "py311"
[tool.black]
line-length = 88
target-version = ['py311']
[tool.mypy]
python_version = "3.11"
check_untyped_defs = true
plugins = ["mypy_django_plugin.main"]
5. МОДЕЛИ ДАННЫХ
5.1. Базовые модели (apps.core)
TimestampMixin — миксин для автоматического добавления полей времени:
class TimestampMixin(models.Model):
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
abstract = True
5.2. Модели парсеров (apps.parsers)
ParserLoadLog
Лог загрузок парсеров:
class ParserLoadLog(TimestampMixin, models.Model):
class Source(models.TextChoices):
INDUSTRIAL = "industrial", "Промышленное производство"
MANUFACTURES = "manufactures", "Реестр производителей"
INSPECTIONS = "inspections", "Единый реестр проверок"
PROCUREMENTS = "procurements", "Госзакупки"
FNS_REPORTS = "fns_reports", "Бухгалтерская отчётность ФНС"
batch_id = models.PositiveIntegerField(db_index=True)
source = models.CharField(max_length=50, choices=Source.choices, db_index=True)
records_count = models.PositiveIntegerField(default=0)
status = models.CharField(max_length=20, default="success")
error_message = models.TextField(blank=True)
class Meta:
db_table = "parsers_load_log"
constraints = [
models.UniqueConstraint(
fields=["source", "batch_id"],
name="unique_load_batch_per_source",
),
]
IndustrialCertificateRecord
Сертификат промышленного производства:
class IndustrialCertificateRecord(TimestampMixin, models.Model):
load_batch = models.PositiveIntegerField(db_index=True)
issue_date = models.CharField(max_length=15, blank=True)
certificate_number = models.CharField(max_length=100, db_index=True)
expiry_date = models.CharField(max_length=15, blank=True)
certificate_file_url = models.TextField(blank=True)
organisation_name = models.TextField()
inn = models.CharField(max_length=20, db_index=True)
ogrn = models.CharField(max_length=20, db_index=True)
class Meta:
db_table = "parsers_industrial_certificate"
constraints = [
models.UniqueConstraint(
fields=["certificate_number"],
name="unique_certificate_number",
),
]
ManufacturerRecord
Производитель из реестра Минпромторга:
class ManufacturerRecord(TimestampMixin, models.Model):
load_batch = models.PositiveIntegerField(db_index=True)
inn = models.CharField(max_length=20, db_index=True)
ogrn = models.CharField(max_length=20, db_index=True)
manufacturer_name = models.TextField()
# ... другие поля
InspectionRecord
Запись о проверке (proverki.gov.ru):
class InspectionRecord(TimestampMixin, models.Model):
class InspectionType(models.TextChoices):
FZ294 = "294", "ФЗ-294 (традиционные)"
FZ248 = "248", "ФЗ-248 (новые)"
load_batch = models.PositiveIntegerField(db_index=True)
inspection_id = models.CharField(max_length=100, unique=True)
inspection_type = models.CharField(max_length=3, choices=InspectionType.choices)
data_year = models.PositiveIntegerField()
data_month = models.PositiveIntegerField()
# ... другие поля
FinancialReport
Бухгалтерский отчёт (ФНС):
class FinancialReport(TimestampMixin, models.Model):
class SourceType(models.TextChoices):
FILE_WATCH = "file_watch", "Мониторинг папок"
MANUAL = "manual", "Ручная загрузка"
external_id = models.CharField(max_length=100, unique=True)
ogrn = models.CharField(max_length=20, db_index=True)
file_name = models.CharField(max_length=255)
file_hash = models.CharField(max_length=64, db_index=True)
source = models.CharField(max_length=20, choices=SourceType.choices)
batch_id = models.PositiveIntegerField(db_index=True)
5.3. Модели ядра (apps.core)
BackgroundJob
Отслеживание прогресса задач:
class BackgroundJob(TimestampMixin, models.Model):
class Status(models.TextChoices):
PENDING = "pending", "Ожидание"
RUNNING = "running", "Выполняется"
COMPLETED = "completed", "Завершено"
FAILED = "failed", "Ошибка"
task_id = models.CharField(max_length=100, unique=True)
task_name = models.CharField(max_length=255)
status = models.CharField(max_length=20, choices=Status.choices, default=Status.PENDING)
progress = models.PositiveIntegerField(default=0) # 0-100
message = models.TextField(blank=True)
meta = models.JSONField(default=dict, blank=True)
result = models.JSONField(default=dict, blank=True)
error = models.TextField(blank=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
6. СЕРВИСЫ И БИЗНЕС-ЛОГИКА
6.1. BackgroundJobService (apps.core)
Управление фоновыми задачами:
class BackgroundJobService:
@classmethod
def create_job(
cls,
task_id: str,
task_name: str,
meta: dict | None = None,
) -> BackgroundJob:
"""Создать новую задачу."""
return BackgroundJob.objects.create(
task_id=task_id,
task_name=task_name,
meta=meta or {},
)
@classmethod
def get_by_task_id(cls, task_id: str) -> BackgroundJob | None:
"""Получить задачу по ID."""
return BackgroundJob.objects.filter(task_id=task_id).first()
Методы:
| Метод | Описание |
|---|---|
create_job() |
Создание новой задачи |
get_by_task_id() |
Получение задачи по ID |
mark_started() |
Установка статуса "running" |
update_progress() |
Обновление прогресса (%) |
complete() |
Завершение задачи |
fail() |
Пометка как ошибочной |
6.2. ParserLoadLogService (apps.parsers)
Логирование загрузок:
class ParserLoadLogService:
@classmethod
def create_load_log_with_next_batch_id(
cls,
source: ParserLoadLog.Source,
status: str = "in_progress",
) -> tuple[ParserLoadLog, int]:
"""Создать лог загрузки со следующим batch_id."""
last_log = ParserLoadLog.objects.filter(source=source).order_by('-batch_id').first()
next_batch_id = (last_log.batch_id + 1) if last_log else 1
log = ParserLoadLog.objects.create(
batch_id=next_batch_id,
source=source,
status=status,
)
return log, next_batch_id
@classmethod
def update(
cls,
log: ParserLoadLog,
status: str | None = None,
records_count: int | None = None,
error_message: str | None = None,
) -> None:
"""Обновить лог загрузки."""
# ...
@classmethod
def mark_failed(cls, log: ParserLoadLog, error: str) -> None:
"""Пометить как неудачную."""
# ...
6.3. Сервисы парсеров
IndustrialCertificateService
class IndustrialCertificateService:
@classmethod
def save_certificates(
cls,
certificates: list[dict],
batch_id: int,
) -> int:
"""Сохранить сертификаты с дедупликацией."""
saved = 0
for cert_data in certificates:
obj, created = IndustrialCertificateRecord.objects.update_or_create(
certificate_number=cert_data['certificate_number'],
defaults={**cert_data, 'load_batch': batch_id},
)
if created:
saved += 1
return saved
InspectionService
class InspectionService:
@classmethod
def get_last_loaded_period(
cls,
is_federal_law_248: bool,
) -> tuple[int | None, int | None]:
"""Получить последний загруженный период."""
last_log = InspectionRecord.objects.filter(
is_federal_law_248=is_federal_law_248
).order_by('-data_year', '-data_month').first()
if last_log:
return last_log.data_year, last_log.data_month
return None, None
@classmethod
def save_inspections(
cls,
inspections: list[dict],
batch_id: int,
is_federal_law_248: bool = False,
data_year: int | None = None,
data_month: int | None = None,
) -> int:
"""Сохранить данные о проверках."""
# ...
FNSReportService
class FNSReportService:
@classmethod
def exists_by_hash(cls, file_hash: str) -> bool:
"""Проверить существование файла по хешу."""
return FinancialReport.objects.filter(file_hash=file_hash).exists()
@classmethod
def save_report(
cls,
external_id: str,
ogrn: str,
file_name: str,
file_hash: str,
source: FinancialReport.SourceType,
batch_id: int,
lines_data: list[dict],
) -> FinancialReport:
"""Сохранить отчёт со строками."""
# ...
7. ЗАДАЧИ CELERY
7.1. Конфигурация (config/celery.py)
app = Celery("project")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.conf.beat_schedule = {
"parse-industrial-production-daily": {
"task": "apps.parsers.tasks.parse_industrial_production",
"schedule": 86400.0, # Каждые 24 часа
},
"parse-manufactures-daily": {
"task": "apps.parsers.tasks.parse_manufactures",
"schedule": 86400.0,
},
"scan-fns-directory": {
"task": "apps.parsers.tasks.scan_fns_directory",
"schedule": 300.0, # Каждые 5 минут
},
}
7.2. Основные задачи
parse_industrial_production
@shared_task(bind=True)
def parse_industrial_production(
self,
proxies: list[str] | None = None,
client_adapter: BaseAdapter | None = None,
) -> dict:
"""Парсинг сертификатов Минпромторга."""
source = ParserLoadLog.Source.INDUSTRIAL
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(
source=source, status="in_progress"
)
task_id = self.request.id or str(uuid.uuid4())
# Создание BackgroundJob
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.parse_industrial_production",
meta={"source": source, "batch_id": batch_id},
)
job.mark_started()
try:
# Парсинг
if proxies is None:
proxies = ProxyService.get_active_proxies_or_none()
with IndustrialProductionClient(proxies=proxies) as client:
certificates = client.fetch_certificates()
# Сохранение
saved_count = IndustrialCertificateService.save_certificates(
certificates, batch_id=batch_id
)
ParserLoadLogService.update(load_log, status="success", records_count=saved_count)
job.complete(result={"batch_id": batch_id, "saved": saved_count})
return {"batch_id": batch_id, "saved": saved_count, "status": "success"}
except Exception as e:
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {"status": "failed", "error": str(e)}
sync_inspections
Автоматическая синхронизация проверок:
@shared_task(bind=True)
def sync_inspections(
self,
proxies: list[str] | None = None,
use_playwright: bool | None = None,
current_year: int | None = None,
current_month: int | None = None,
) -> dict:
"""Синхронизация данных о проверках.
Логика:
1. Проверяет последнюю загруженную дату в БД
2. Если данных нет — начинает с 01.01.2025
3. Загружает месяц за месяцем до текущего
4. Загружает оба типа проверок (ФЗ-294 и ФЗ-248)
5. При отсутствии данных (2 пустых месяца) — прекращает
"""
# ...
process_fns_file
@shared_task(bind=True)
def process_fns_file(self, file_path: str | Path) -> dict:
"""Обработка файла ФНС."""
task_id = self.request.id
file_path = Path(file_path)
# Создание BackgroundJob
job = BackgroundJobService.create_job(
task_id=task_id,
task_name="apps.parsers.tasks.process_fns_file",
meta={"file": file_path.name},
)
job.mark_started()
try:
# Проверка дубликата
file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
if FNSReportService.exists_by_hash(file_hash):
job.complete(result={"status": "skipped", "reason": "duplicate"})
return {"status": "skipped"}
# Парсинг Excel
parsed = FNSExcelParser.parse_file(file_path)
# Сохранение
report = FNSReportService.save_report(
external_id=parsed.external_id,
ogrn=parsed.ogrn,
file_name=file_path.name,
file_hash=file_hash,
source=FinancialReport.SourceType.FILE_WATCH,
lines_data=[asdict(line) for line in parsed.lines],
)
job.complete(result={"report_id": report.id})
return {"status": "success"}
except Exception as e:
job.fail(error=str(e))
return {"status": "failed", "error": str(e)}
7.3. Вызов задач
# Асинхронно
from apps.parsers.tasks import parse_industrial_production
result = parse_industrial_production.delay()
print(result.id) # ID задачи
# С параметрами
result = parse_inspections.delay(year=2025, month=10, is_federal_law_248=False)
# Синхронно (для тестов)
from apps.parsers.tasks import parse_manufactures
result = parse_manufactures.apply(kwargs={"proxies": [...]})
8. КЛИЕНТЫ И ПАРСЕРЫ
8.1. Структура клиентов
apps/parsers/clients/
├── minpromtorg/
│ ├── __init__.py
│ ├── base.py # Базовый класс
│ ├── industrial_production.py
│ └── manufactures.py
├── proverki/
│ ├── __init__.py
│ └── proverki_client.py
├── zakupki/
│ └── zakupki_client.py
└── fns/
├── __init__.py
├── parser.py # FNSExcelParser
└── directory_scanner.py
8.2. Клиент Минпромторга
class IndustrialProductionClient:
"""Клиент для API Минпромторга (сертификаты)."""
BASE_URL = "https://minpromtorg.gov.ru/api"
def __init__(
self,
proxies: list[str] | None = None,
http_adapter: BaseAdapter | None = None,
):
self.session = requests.Session()
if proxies:
self.session.proxies = {"http": proxies[0], "https": proxies[0]}
if http_adapter:
self.session.mount("http://", http_adapter)
def fetch_certificates(self) -> list[dict]:
"""Получить список сертификатов."""
response = self.session.get(f"{self.BASE_URL}/certificates")
response.raise_for_status()
return response.json()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.session.close()
8.3. Клиент proverki.gov.ru
class ProverkiClient:
"""Клиент для proverki.gov.ru с поддержкой Playwright."""
def __init__(
self,
proxies: list[str] | None = None,
use_playwright: bool = True,
):
self.proxies = proxies
self.use_playwright = use_playwright
self.browser = None
self.page = None
def _init_browser(self):
"""Инициализация Playwright."""
from playwright.sync_api import sync_playwright
playwright = sync_playwright().start()
self.browser = playwright.chromium.launch(headless=True)
self.page = self.browser.new_page()
def fetch_inspections(
self,
year: int | None = None,
month: int | None = None,
file_url: str | None = None,
progress_callback: Callable | None = None,
) -> list[dict]:
"""Получить данные о проверках."""
if self.use_playwright:
self._init_browser()
return self._fetch_with_playwright(year, month, file_url, progress_callback)
else:
return self._fetch_with_requests(year, month, file_url)
def _fetch_with_playwright(...) -> list[dict]:
"""Парсинг через Playwright (JS-rendering)."""
self.page.goto("https://proverki.gov.ru")
# ... навигация и парсинг
return inspections
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.browser:
self.browser.close()
8.4. Парсер ФНС
class FNSExcelParser:
"""Парсер Excel-файлов ФНС."""
@staticmethod
def parse_file(file_path: Path) -> ParsedData:
"""Разобрать Excel-файл."""
import pandas as pd
df = pd.read_excel(file_path, sheet_name=0)
# Извлечение данных
external_id = df.iloc[0, 0] # Пример
ogrn = df.iloc[1, 0]
lines = []
for _, row in df.iterrows():
lines.append(FNSLine(
period=row['period'],
revenue=row['revenue'],
# ...
))
return ParsedData(
external_id=external_id,
ogrn=ogrn,
lines=lines,
)
9. API И ПРЕДСТАВИТЕЛИ
9.1. Сериализаторы
# apps/parsers/serializers.py
class IndustrialCertificateSerializer(serializers.ModelSerializer):
class Meta:
model = IndustrialCertificateRecord
fields = [
'id', 'certificate_number', 'issue_date', 'expiry_date',
'organisation_name', 'inn', 'ogrn', 'created_at',
]
read_only_fields = ['id', 'created_at']
class ParserLoadLogSerializer(serializers.ModelSerializer):
class Meta:
model = ParserLoadLog
fields = [
'id', 'batch_id', 'source', 'records_count',
'status', 'error_message', 'created_at',
]
9.2. Классы представлений
# apps/parsers/views.py
from rest_framework import viewsets, permissions
from apps.parsers.models import IndustrialCertificateRecord
from apps.parsers.serializers import IndustrialCertificateSerializer
class IndustrialCertificateViewSet(viewsets.ReadOnlyModelViewSet):
"""Только чтение для сертификатов."""
queryset = IndustrialCertificateRecord.objects.all()
serializer_class = IndustrialCertificateSerializer
permission_classes = [permissions.IsAuthenticated]
filter_backends = [filters.SearchFilter, filters.OrderingFilter]
search_fields = ['organisation_name', 'certificate_number']
ordering_fields = ['created_at', 'issue_date']
9.3. Маршруты (URLs)
# apps/parsers/urls.py
from rest_framework.routers import DefaultRouter
from apps.parsers.views import IndustrialCertificateViewSet
router = DefaultRouter()
router.register('certificates', IndustrialCertificateViewSet, basename='certificate')
urlpatterns = router.urls
# config/api_v1_urls.py
from django.urls import path, include
urlpatterns = [
path('parsers/', include('apps.parsers.urls')),
# ...
]
10. ТЕСТИРОВАНИЕ
10.1. Конфигурация (pytest)
# tests/conftest.py
import pytest
from django.conf import settings
@pytest.fixture(autouse=True)
def enable_db_access_for_all_tests(db):
pass
@pytest.fixture
def api_client():
from rest_framework.test import APIClient
return APIClient()
@pytest.fixture
def user():
from apps.user.models import User
return User.objects.create_user(email='test@test.com', password='test')
10.2. Пример теста
# tests/apps/parsers/test_services.py
import pytest
from model_bakery import baker
from apps.parsers.services import IndustrialCertificateService
from apps.parsers.models import IndustrialCertificateRecord
@pytest.mark.django_db
class TestIndustrialCertificateService:
def test_save_certificates(self):
certificates = [
{
'certificate_number': 'CERT-001',
'issue_date': '2025-01-01',
'organisation_name': 'ООО Ромашка',
'inn': '1234567890',
'ogrn': '1234567890123',
}
]
saved = IndustrialCertificateService.save_certificates(certificates, batch_id=1)
assert saved == 1
assert IndustrialCertificateRecord.objects.count() == 1
def test_save_certificates_duplicate(self):
# Создать дубликат
baker.make(
IndustrialCertificateRecord,
certificate_number='CERT-001'
)
certificates = [
{
'certificate_number': 'CERT-001',
'issue_date': '2025-01-01',
'organisation_name': 'ООО Ромашка',
'inn': '1234567890',
'ogrn': '1234567890123',
}
]
saved = IndustrialCertificateService.save_certificates(certificates, batch_id=2)
assert saved == 0 # Не сохранён (дубликат)
assert IndustrialCertificateRecord.objects.count() == 1
10.3. Тесты задач Celery
# tests/apps/parsers/test_tasks.py
import pytest
from apps.parsers.tasks import parse_industrial_production
@pytest.mark.django_db
class TestParseTasks:
def test_parse_industrial_production(self, mocker):
# Мок клиента
mock_client = mocker.patch(
'apps.parsers.tasks.IndustrialProductionClient'
)
mock_client.return_value.__enter__.return_value.fetch_certificates.return_value = []
result = parse_industrial_production.delay()
assert result.get()['status'] == 'success'
10.4. Команды тестирования
# Все тесты
make test
# С покрытием
make test-cov
# Только быстрые (без медленных)
make test-fast TARGET="--fast"
# Параллельно
make test-parallel
# Конкретный модуль
python -m pytest tests/apps/parsers/test_services.py -v
11. РАЗРАБОТКА И ОТЛАДКА
11.1. Локальный запуск
# Миграции
cd src
python manage.py makemigrations
python manage.py migrate
# Создание суперпользователя
python manage.py createsuperuser
# Запуск сервера разработки
python manage.py runserver
# Запуск Celery worker (в отдельном терминале)
celery -A config worker --loglevel=info
# Запуск Celery beat
celery -A config beat --loglevel=info
11.2. Отладка Celery
# Worker с отладкой
celery -A config worker --loglevel=debug --pool=solo
# Проверка очереди
celery -A config inspect active
celery -A config inspect registered
# Очистка очереди
celery -A config purge
11.3. Django Debug Toolbar
# Установка (уже в requirements-dev.txt)
uv pip install django-debug-toolbar
# Добавить в settings/dev.py:
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
11.4. Логирование
# config/settings/dev.py
LOGGING = {
'version': 1,
'handlers': {
'console': {'class': 'logging.StreamHandler'},
'file': {
'class': 'logging.FileHandler',
'filename': 'logs/debug.log',
},
},
'loggers': {
'apps.parsers': {
'handlers': ['console', 'file'],
'level': 'DEBUG',
},
},
}
12. РАСШИРЕНИЕ ФУНКЦИОНАЛЬНОСТИ
12.1. Добавление нового парсера
- Создать модель:
# apps/parsers/models.py
class NewSourceRecord(TimestampMixin, models.Model):
load_batch = models.PositiveIntegerField(db_index=True)
# ... поля
class Meta:
db_table = "parsers_new_source"
constraints = [
models.UniqueConstraint(
fields=["unique_field"],
name="unique_new_source",
),
]
- Создать сервис:
# apps/parsers/services.py
class NewSourceService:
@classmethod
def save_records(cls, records: list[dict], batch_id: int) -> int:
saved = 0
for data in records:
obj, created = NewSourceRecord.objects.update_or_create(
unique_field=data['unique_field'],
defaults={**data, 'load_batch': batch_id},
)
if created:
saved += 1
return saved
- Создать клиента:
# apps/parsers/clients/new_source.py
class NewSourceClient:
def __init__(self, proxies=None):
self.session = requests.Session()
def fetch_data(self) -> list[dict]:
response = self.session.get("https://api.example.com/data")
return response.json()
- Создать задачу Celery:
# apps/parsers/tasks.py
@shared_task(bind=True)
def parse_new_source(self, proxies=None):
source = ParserLoadLog.Source.NEW_SOURCE
load_log, batch_id = ParserLoadLogService.create_load_log_with_next_batch_id(source)
job = BackgroundJobService.create_job(
task_id=self.request.id,
task_name="parse_new_source",
meta={"source": source},
)
job.mark_started()
try:
with NewSourceClient(proxies=proxies) as client:
records = client.fetch_data()
saved = NewSourceService.save_records(records, batch_id)
ParserLoadLogService.update(load_log, status="success", records_count=saved)
job.complete(result={"saved": saved})
return {"status": "success"}
except Exception as e:
ParserLoadLogService.mark_failed(load_log, str(e))
job.fail(error=str(e))
return {"status": "failed"}
- Добавить в admin:
# apps/parsers/admin.py
@admin.register(NewSourceRecord)
class NewSourceRecordAdmin(admin.ModelAdmin):
list_display = ['id', 'unique_field', 'load_batch', 'created_at']
list_filter = ['created_at']
search_fields = ['unique_field']
12.2. Добавление периодической задачи
# config/celery.py
app.conf.beat_schedule = {
# ... существующие
"parse-new-source-hourly": {
"task": "apps.parsers.tasks.parse_new_source",
"schedule": 3600.0, # Каждый час
},
}
12.3. Миграции
cd src
python manage.py makemigrations parsers
python manage.py migrate
ПРИЛОЖЕНИЕ А. ПОЛЕЗНЫЕ КОМАНДЫ
# Создание миграций
python manage.py makemigrations
python manage.py migrate
# Django shell
python manage.py shell
# Проверка кода
ruff check src/
black --check src/
mypy src/
# Форматирование
black src/
isort src/
# Безопасность
bandit -r src/
# Тесты
pytest tests/ -v
pytest tests/ --cov=src/
ПРИЛОЖЕНИЕ Б. СТРУКТУРА БД
-- Основные таблицы
parsers_load_log -- Логи загрузок
parsers_industrial_certificate -- Сертификаты
parsers_manufacturer_record -- Производители
parsers_inspection_record -- Проверки
parsers_financial_report -- Отчёты ФНС
core_backgroundjob -- Задачи
auth_user -- Пользователи
Документ составлен на основе версии кода от 2026-01-21