Files
mostovik-backend/src/apps/exchange/services.py
Aleksandr Meshchriakov 3de66cc25c
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Successful in 3m16s
CI/CD Pipeline / Run Tests (push) Successful in 3m26s
CI/CD Pipeline / Telegram Notify Success (push) Failing after 1m29s
CI/CD Pipeline / Run Tests (pull_request) Successful in 1m44s
CI/CD Pipeline / Code Quality Checks (pull_request) Successful in 20m19s
CI/CD Pipeline / Telegram Notify Success (pull_request) Failing after 1m34s
Add periodic exchange task management API
2026-03-19 17:03:47 +01:00

648 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Сервисы приложения обмена данными."""
from __future__ import annotations
import json
from contextlib import suppress
from typing import Any
from apps.exchange.models import ExchangeConnection
from django.apps import apps as django_apps
from django.conf import settings
from django.core.exceptions import ValidationError as DjangoValidationError
from django.db import IntegrityError, connections, transaction
from django.utils import timezone
from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask
class ExchangeServiceError(ValueError):
"""Ошибка операций приложения обмена данными."""
class ExchangeConnectionService:
"""Сервис управления подключениями и синхронизацией данных."""
PARSER_MODEL_LABELS = [
"parsers.ParserLoadLog",
"parsers.IndustrialCertificateRecord",
"parsers.ManufacturerRecord",
"parsers.Proxy",
"parsers.InspectionRecord",
"parsers.ProcurementRecord",
"parsers.FinancialReport",
"parsers.FinancialReportLine",
]
@classmethod
def test_connection_payload(cls, **payload) -> dict[str, str]:
"""Проверить подключение и структуру без сохранения в БД."""
connection = ExchangeConnection(is_active=False, **payload)
alias = cls.test_connection(connection)
cls.validate_target_structure(
connection=connection,
alias=alias,
schema_name=connection.schema_name,
)
with suppress(Exception):
connections[alias].close()
with suppress(Exception):
connections.databases.pop(alias, None)
return {
"status": "success",
"message": "Подключение и структура целевой БД валидны.",
}
@classmethod
@transaction.atomic
def create_active_connection_and_prepare(cls, **payload) -> ExchangeConnection:
"""
Создать активное подключение.
В рамках одной операции:
1. Деактивировать текущее активное подключение.
2. Сохранить новое как активное.
3. Проверить соединение и структуру target DB.
Важно: сервис НЕ изменяет структуру target DB.
"""
ExchangeConnection.objects.filter(is_active=True).update(is_active=False)
connection = ExchangeConnection.objects.create(is_active=True, **payload)
try:
alias = cls.test_connection(connection)
cls.validate_target_structure(
connection=connection,
alias=alias,
schema_name=connection.schema_name,
)
except Exception as exc: # noqa: BLE001
raise ExchangeServiceError(str(exc)) from exc
connection.last_checked_at = timezone.now()
connection.last_error = ""
connection.save(update_fields=["last_checked_at", "last_error", "updated_at"])
return connection
@classmethod
def get_active_connection(cls) -> ExchangeConnection:
connection = ExchangeConnection.objects.filter(is_active=True).first()
if not connection:
raise ExchangeServiceError("Активное подключение не найдено")
return connection
@classmethod
def test_connection(cls, connection: ExchangeConnection) -> str:
alias = cls._configure_alias(connection)
try:
db_connection = connections[alias]
db_connection.ensure_connection()
with db_connection.cursor() as cursor:
cursor.execute("SELECT 1")
except Exception as exc: # noqa: BLE001
cls._mark_connection_error(connection, str(exc))
raise ExchangeServiceError(
f"Ошибка подключения к целевой БД: {exc}"
) from exc
return alias
@classmethod
def validate_target_structure(
cls,
*,
connection: ExchangeConnection,
alias: str,
schema_name: str,
models_to_copy: list | None = None,
) -> None:
"""
Проверить структуру target DB без изменений.
Проверяет:
- существование схемы
- наличие всех обязательных таблиц
- наличие всех обязательных колонок в таблицах
"""
try:
db_connection = connections[alias]
db_connection.ensure_connection()
required_models = models_to_copy or cls._extend_models_with_dependencies(
cls._get_parser_models()
)
cls._validate_schema_exists(alias=alias, schema_name=schema_name)
cls._validate_tables_exist(
alias=alias,
schema_name=schema_name,
models_to_copy=required_models,
)
cls._validate_columns_exist(
alias=alias,
schema_name=schema_name,
models_to_copy=required_models,
)
except ExchangeServiceError as exc:
cls._mark_connection_error(connection, str(exc))
raise
except Exception as exc: # noqa: BLE001
cls._mark_connection_error(connection, str(exc))
raise ExchangeServiceError(
f"Ошибка проверки структуры целевой БД: {exc}"
) from exc
@classmethod
def copy_parsers_data(
cls,
*,
connection: ExchangeConnection,
mode: str,
table: str | None = None,
tables: list[str] | None = None,
truncate_before_copy: bool = True,
) -> dict[str, Any]:
"""Скопировать данные из локальной БД в целевую БД."""
alias = cls._configure_alias(connection)
selected_models = cls._resolve_models(mode=mode, table=table, tables=tables)
models_to_copy = cls._extend_models_with_dependencies(selected_models)
try:
connections[alias].ensure_connection()
except Exception as exc: # noqa: BLE001
cls._mark_connection_error(connection, str(exc))
raise ExchangeServiceError(
f"Ошибка подключения к целевой БД: {exc}"
) from exc
cls.validate_target_structure(
connection=connection,
alias=alias,
schema_name=connection.schema_name,
models_to_copy=models_to_copy,
)
if truncate_before_copy:
cls._truncate_tables(alias=alias, models_to_copy=models_to_copy)
copied_by_table: dict[str, int] = {}
for model in models_to_copy:
copied_by_table[model._meta.db_table] = cls._copy_model_data(
model=model,
alias=alias,
truncate_before_copy=truncate_before_copy,
)
total_rows = sum(copied_by_table.values())
connection.last_checked_at = timezone.now()
connection.last_error = ""
connection.save(update_fields=["last_checked_at", "last_error", "updated_at"])
return {
"mode": mode,
"tables": list(copied_by_table.keys()),
"rows_by_table": copied_by_table,
"total_rows": total_rows,
"truncate_before_copy": truncate_before_copy,
}
@classmethod
def _configure_alias(cls, connection: ExchangeConnection) -> str:
alias = f"exchange_target_{connection.id}"
config = {
"ENGINE": "django.db.backends.postgresql",
"NAME": connection.database_name,
"USER": connection.username,
"PASSWORD": connection.get_decrypted_password(),
"HOST": connection.server,
"PORT": connection.port,
"OPTIONS": {
"options": f"-c search_path={connection.schema_name},public",
},
"CONN_MAX_AGE": 0,
"ATOMIC_REQUESTS": False,
"AUTOCOMMIT": True,
"TIME_ZONE": None,
"TEST": {},
}
if alias in connections.databases:
with suppress(Exception):
connections[alias].close()
connections.databases[alias] = config
storage = getattr(connections, "_connections", None)
if storage is not None and hasattr(storage, "__dict__"):
storage.__dict__.pop(alias, None)
return alias
@classmethod
def _validate_schema_exists(cls, *, alias: str, schema_name: str) -> None:
with connections[alias].cursor() as cursor:
cursor.execute(
"""
SELECT 1
FROM information_schema.schemata
WHERE schema_name = %s
""",
[schema_name],
)
if cursor.fetchone() is None:
raise ExchangeServiceError(
f"Схема '{schema_name}' отсутствует в целевой БД"
)
@classmethod
def _validate_tables_exist(
cls,
*,
alias: str,
schema_name: str,
models_to_copy: list,
) -> None:
expected_tables = {model._meta.db_table for model in models_to_copy}
with connections[alias].cursor() as cursor:
cursor.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = %s
""",
[schema_name],
)
existing_tables = {row[0] for row in cursor.fetchall()}
missing_tables = sorted(expected_tables - existing_tables)
if missing_tables:
raise ExchangeServiceError(
"В целевой БД отсутствуют таблицы: " + ", ".join(missing_tables)
)
@classmethod
def _validate_columns_exist(
cls,
*,
alias: str,
schema_name: str,
models_to_copy: list,
) -> None:
for model in models_to_copy:
table_name = model._meta.db_table
expected_columns = {field.column for field in model._meta.local_fields}
with connections[alias].cursor() as cursor:
cursor.execute(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
""",
[schema_name, table_name],
)
existing_columns = {row[0] for row in cursor.fetchall()}
missing_columns = sorted(expected_columns - existing_columns)
if missing_columns:
raise ExchangeServiceError(
f"В таблице '{table_name}' отсутствуют колонки: "
+ ", ".join(missing_columns)
)
@classmethod
def _get_parser_models(cls) -> list:
return [django_apps.get_model(label) for label in cls.PARSER_MODEL_LABELS]
@classmethod
def _resolve_models(
cls,
*,
mode: str,
table: str | None,
tables: list[str] | None,
) -> list:
parser_models = cls._get_parser_models()
if mode == "all":
return parser_models
mapping: dict[str, Any] = {}
for model in parser_models:
mapping[model._meta.db_table] = model
mapping[model._meta.model_name] = model
mapping[model.__name__.lower()] = model
requested_names: list[str]
if mode == "single":
requested_names = [table] if table else []
else:
requested_names = tables or []
resolved_models = []
for requested_name in requested_names:
model = mapping.get(requested_name)
if not model:
available = ", ".join(sorted(m._meta.db_table for m in parser_models))
raise ExchangeServiceError(
f"Неизвестная таблица '{requested_name}'. Доступные: {available}"
)
resolved_models.append(model)
return resolved_models
@classmethod
def _extend_models_with_dependencies(cls, models_to_copy: list) -> list:
"""Добавить обязательные зависимые модели для корректного copy."""
if not cls._requires_registry_organizations(models_to_copy):
return models_to_copy
organization_model = django_apps.get_model("registers.Organization")
ordered_models = [organization_model, *models_to_copy]
unique_models = []
seen = set()
for model in ordered_models:
model_key = (model._meta.app_label, model._meta.model_name)
if model_key in seen:
continue
seen.add(model_key)
unique_models.append(model)
return unique_models
@classmethod
def _requires_registry_organizations(cls, models_to_copy: list) -> bool:
return any(
any(
field.name == "registry_organization"
for field in model._meta.local_fields
)
for model in models_to_copy
)
@classmethod
def _truncate_tables(cls, *, alias: str, models_to_copy: list) -> None:
with connections[alias].cursor() as cursor:
for model in reversed(models_to_copy):
cursor.execute(
f'TRUNCATE TABLE "{model._meta.db_table}" RESTART IDENTITY CASCADE'
)
@classmethod
def _copy_model_data(
cls,
*,
model,
alias: str,
truncate_before_copy: bool,
chunk_size: int = 1000,
) -> int:
field_names = [field.attname for field in model._meta.local_fields]
queryset = model.objects.using("default").all().order_by("pk")
total_processed = 0
batch = []
pk_name = model._meta.pk.attname
for source_obj in queryset.iterator(chunk_size=chunk_size):
row_data = {
field_name: getattr(source_obj, field_name)
for field_name in field_names
}
batch.append(model(**row_data))
if len(batch) >= chunk_size:
total_processed += cls._insert_batch(
model=model,
alias=alias,
batch=batch,
pk_name=pk_name,
chunk_size=chunk_size,
truncate_before_copy=truncate_before_copy,
)
batch = []
if batch:
total_processed += cls._insert_batch(
model=model,
alias=alias,
batch=batch,
pk_name=pk_name,
chunk_size=chunk_size,
truncate_before_copy=truncate_before_copy,
)
return total_processed
@classmethod
def _insert_batch(
cls,
*,
model,
alias: str,
batch: list,
pk_name: str,
chunk_size: int,
truncate_before_copy: bool,
) -> int:
if truncate_before_copy:
model.objects.using(alias).bulk_create(
batch,
batch_size=chunk_size,
ignore_conflicts=False,
)
return len(batch)
pk_values = [getattr(item, pk_name) for item in batch]
existing_before = set(
model.objects.using(alias)
.filter(**{f"{pk_name}__in": pk_values})
.values_list(pk_name, flat=True)
)
model.objects.using(alias).bulk_create(
batch,
batch_size=chunk_size,
ignore_conflicts=True,
)
existing_after = set(
model.objects.using(alias)
.filter(**{f"{pk_name}__in": pk_values})
.values_list(pk_name, flat=True)
)
return len(existing_after - existing_before)
@classmethod
def _mark_connection_error(
cls, connection: ExchangeConnection, error_message: str
) -> None:
connection.last_checked_at = timezone.now()
connection.last_error = error_message
if connection.pk:
connection.save(
update_fields=["last_checked_at", "last_error", "updated_at"]
)
class ExchangePeriodicTaskService:
"""Сервис управления периодическими задачами обмена."""
TASK_NAME = "apps.exchange.tasks.dispatch_periodic_exchange_copy"
@classmethod
def get_queryset(cls):
return (
PeriodicTask.objects.filter(task=cls.TASK_NAME)
.select_related("interval", "crontab")
.order_by("name")
)
@classmethod
@transaction.atomic
def create_periodic_task(
cls,
*,
name: str,
payload: dict[str, Any],
schedule: dict[str, Any],
description: str = "",
enabled: bool = True,
) -> PeriodicTask:
task = PeriodicTask(
name=name,
task=cls.TASK_NAME,
kwargs=json.dumps({"payload": payload}, ensure_ascii=False),
description=description,
enabled=enabled,
)
cls._assign_schedule(task=task, schedule=schedule)
return cls._save_task(task)
@classmethod
@transaction.atomic
def update_periodic_task(
cls,
*,
task: PeriodicTask,
payload: dict[str, Any],
schedule: dict[str, Any],
name: str | None = None,
description: str | None = None,
enabled: bool | None = None,
) -> PeriodicTask:
old_interval_id = task.interval_id
old_crontab_id = task.crontab_id
if name is not None:
task.name = name
if description is not None:
task.description = description
if enabled is not None:
task.enabled = enabled
task.kwargs = json.dumps({"payload": payload}, ensure_ascii=False)
cls._assign_schedule(task=task, schedule=schedule)
task = cls._save_task(task)
cls._cleanup_unused_interval(old_interval_id)
cls._cleanup_unused_crontab(old_crontab_id)
return task
@classmethod
def _assign_schedule(cls, *, task: PeriodicTask, schedule: dict[str, Any]) -> None:
if schedule["type"] == "interval":
task.interval = cls._get_or_create_interval(schedule)
task.crontab = None
return
task.crontab = cls._get_or_create_crontab(schedule)
task.interval = None
@classmethod
def _get_or_create_interval(cls, schedule: dict[str, Any]) -> IntervalSchedule:
interval = IntervalSchedule(
every=schedule["every"],
period=schedule["period"],
)
cls._validate_model(interval)
interval, _ = IntervalSchedule.objects.get_or_create(
every=interval.every,
period=interval.period,
)
return interval
@classmethod
def _get_or_create_crontab(cls, schedule: dict[str, Any]) -> CrontabSchedule:
crontab = CrontabSchedule(
minute=schedule["minute"],
hour=schedule["hour"],
day_of_week=schedule["day_of_week"],
day_of_month=schedule["day_of_month"],
month_of_year=schedule["month_of_year"],
timezone=settings.TIME_ZONE,
)
cls._validate_model(crontab)
crontab, _ = CrontabSchedule.objects.get_or_create(
minute=crontab.minute,
hour=crontab.hour,
day_of_week=crontab.day_of_week,
day_of_month=crontab.day_of_month,
month_of_year=crontab.month_of_year,
timezone=crontab.timezone,
)
return crontab
@classmethod
def _save_task(cls, task: PeriodicTask) -> PeriodicTask:
try:
task.full_clean()
task.save()
except DjangoValidationError as exc:
raise ExchangeServiceError(cls._format_validation_error(exc)) from exc
except IntegrityError as exc:
raise ExchangeServiceError(
"Периодическая задача с таким именем уже существует"
) from exc
return task
@classmethod
def _validate_model(cls, instance) -> None:
try:
instance.full_clean()
except DjangoValidationError as exc:
raise ExchangeServiceError(cls._format_validation_error(exc)) from exc
@classmethod
def _format_validation_error(cls, exc: DjangoValidationError) -> str:
if hasattr(exc, "message_dict"):
messages = []
for field, field_errors in exc.message_dict.items():
messages.extend(f"{field}: {error}" for error in field_errors)
return "; ".join(messages)
return "; ".join(exc.messages)
@classmethod
def _cleanup_unused_interval(cls, interval_id: int | None) -> None:
if not interval_id:
return
if PeriodicTask.objects.filter(interval_id=interval_id).exists():
return
IntervalSchedule.objects.filter(id=interval_id).delete()
@classmethod
def _cleanup_unused_crontab(cls, crontab_id: int | None) -> None:
if not crontab_id:
return
if PeriodicTask.objects.filter(crontab_id=crontab_id).exists():
return
CrontabSchedule.objects.filter(id=crontab_id).delete()