feat(registry): add new endpoints for registers, exchange, and backups; update routing and configurations
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 3m10s
CI/CD Pipeline / Run Tests (push) Successful in 3m35s
CI/CD Pipeline / Telegram Notify Success (push) Has been skipped
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 2m26s
CI/CD Pipeline / Run Tests (pull_request) Successful in 2m46s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been skipped
Some checks failed
CI/CD Pipeline / Code Quality Checks (push) Failing after 3m10s
CI/CD Pipeline / Run Tests (push) Successful in 3m35s
CI/CD Pipeline / Telegram Notify Success (push) Has been skipped
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 2m26s
CI/CD Pipeline / Run Tests (pull_request) Successful in 2m46s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been skipped
This commit is contained in:
447
src/apps/exchange/services.py
Normal file
447
src/apps/exchange/services.py
Normal file
@@ -0,0 +1,447 @@
|
||||
"""Сервисы приложения обмена данными."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import suppress
|
||||
from typing import Any
|
||||
|
||||
from apps.exchange.models import ExchangeConnection
|
||||
from django.apps import apps as django_apps
|
||||
from django.db import connections, transaction
|
||||
from django.utils import timezone
|
||||
|
||||
|
||||
class ExchangeServiceError(ValueError):
|
||||
"""Ошибка операций приложения обмена данными."""
|
||||
|
||||
|
||||
class ExchangeConnectionService:
|
||||
"""Сервис управления подключениями и синхронизацией данных."""
|
||||
|
||||
PARSER_MODEL_LABELS = [
|
||||
"parsers.ParserLoadLog",
|
||||
"parsers.IndustrialCertificateRecord",
|
||||
"parsers.ManufacturerRecord",
|
||||
"parsers.Proxy",
|
||||
"parsers.InspectionRecord",
|
||||
"parsers.ProcurementRecord",
|
||||
"parsers.FinancialReport",
|
||||
"parsers.FinancialReportLine",
|
||||
]
|
||||
|
||||
@classmethod
|
||||
@transaction.atomic
|
||||
def create_active_connection_and_prepare(cls, **payload) -> ExchangeConnection:
|
||||
"""
|
||||
Создать активное подключение.
|
||||
|
||||
В рамках одной операции:
|
||||
1. Деактивировать текущее активное подключение.
|
||||
2. Сохранить новое как активное.
|
||||
3. Проверить соединение и структуру target DB.
|
||||
|
||||
Важно: сервис НЕ изменяет структуру target DB.
|
||||
"""
|
||||
ExchangeConnection.objects.filter(is_active=True).update(is_active=False)
|
||||
connection = ExchangeConnection.objects.create(is_active=True, **payload)
|
||||
|
||||
try:
|
||||
alias = cls.test_connection(connection)
|
||||
cls.validate_target_structure(
|
||||
connection=connection,
|
||||
alias=alias,
|
||||
schema_name=connection.schema_name,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
raise ExchangeServiceError(str(exc)) from exc
|
||||
|
||||
connection.last_checked_at = timezone.now()
|
||||
connection.last_error = ""
|
||||
connection.save(update_fields=["last_checked_at", "last_error", "updated_at"])
|
||||
|
||||
return connection
|
||||
|
||||
@classmethod
|
||||
def get_active_connection(cls) -> ExchangeConnection:
|
||||
connection = ExchangeConnection.objects.filter(is_active=True).first()
|
||||
if not connection:
|
||||
raise ExchangeServiceError("Активное подключение не найдено")
|
||||
return connection
|
||||
|
||||
@classmethod
|
||||
def test_connection(cls, connection: ExchangeConnection) -> str:
|
||||
alias = cls._configure_alias(connection)
|
||||
|
||||
try:
|
||||
db_connection = connections[alias]
|
||||
db_connection.ensure_connection()
|
||||
with db_connection.cursor() as cursor:
|
||||
cursor.execute("SELECT 1")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
cls._mark_connection_error(connection, str(exc))
|
||||
raise ExchangeServiceError(f"Ошибка подключения к целевой БД: {exc}") from exc
|
||||
|
||||
return alias
|
||||
|
||||
@classmethod
|
||||
def validate_target_structure(
|
||||
cls,
|
||||
*,
|
||||
connection: ExchangeConnection,
|
||||
alias: str,
|
||||
schema_name: str,
|
||||
models_to_copy: list | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Проверить структуру target DB без изменений.
|
||||
|
||||
Проверяет:
|
||||
- существование схемы
|
||||
- наличие всех обязательных таблиц
|
||||
- наличие всех обязательных колонок в таблицах
|
||||
"""
|
||||
try:
|
||||
db_connection = connections[alias]
|
||||
db_connection.ensure_connection()
|
||||
required_models = models_to_copy or cls._extend_models_with_dependencies(
|
||||
cls._get_parser_models()
|
||||
)
|
||||
cls._validate_schema_exists(alias=alias, schema_name=schema_name)
|
||||
cls._validate_tables_exist(
|
||||
alias=alias,
|
||||
schema_name=schema_name,
|
||||
models_to_copy=required_models,
|
||||
)
|
||||
cls._validate_columns_exist(
|
||||
alias=alias,
|
||||
schema_name=schema_name,
|
||||
models_to_copy=required_models,
|
||||
)
|
||||
except ExchangeServiceError as exc:
|
||||
cls._mark_connection_error(connection, str(exc))
|
||||
raise
|
||||
except Exception as exc: # noqa: BLE001
|
||||
cls._mark_connection_error(connection, str(exc))
|
||||
raise ExchangeServiceError(
|
||||
f"Ошибка проверки структуры целевой БД: {exc}"
|
||||
) from exc
|
||||
|
||||
@classmethod
|
||||
def copy_parsers_data(
|
||||
cls,
|
||||
*,
|
||||
connection: ExchangeConnection,
|
||||
mode: str,
|
||||
table: str | None = None,
|
||||
tables: list[str] | None = None,
|
||||
truncate_before_copy: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
"""Скопировать данные из локальной БД в целевую БД."""
|
||||
alias = cls._configure_alias(connection)
|
||||
selected_models = cls._resolve_models(mode=mode, table=table, tables=tables)
|
||||
models_to_copy = cls._extend_models_with_dependencies(selected_models)
|
||||
|
||||
try:
|
||||
connections[alias].ensure_connection()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
cls._mark_connection_error(connection, str(exc))
|
||||
raise ExchangeServiceError(f"Ошибка подключения к целевой БД: {exc}") from exc
|
||||
|
||||
cls.validate_target_structure(
|
||||
connection=connection,
|
||||
alias=alias,
|
||||
schema_name=connection.schema_name,
|
||||
models_to_copy=models_to_copy,
|
||||
)
|
||||
|
||||
if truncate_before_copy:
|
||||
cls._truncate_tables(alias=alias, models_to_copy=models_to_copy)
|
||||
|
||||
copied_by_table: dict[str, int] = {}
|
||||
for model in models_to_copy:
|
||||
copied_by_table[model._meta.db_table] = cls._copy_model_data(
|
||||
model=model,
|
||||
alias=alias,
|
||||
truncate_before_copy=truncate_before_copy,
|
||||
)
|
||||
|
||||
total_rows = sum(copied_by_table.values())
|
||||
|
||||
connection.last_checked_at = timezone.now()
|
||||
connection.last_error = ""
|
||||
connection.save(update_fields=["last_checked_at", "last_error", "updated_at"])
|
||||
|
||||
return {
|
||||
"mode": mode,
|
||||
"tables": list(copied_by_table.keys()),
|
||||
"rows_by_table": copied_by_table,
|
||||
"total_rows": total_rows,
|
||||
"truncate_before_copy": truncate_before_copy,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _configure_alias(cls, connection: ExchangeConnection) -> str:
|
||||
alias = f"exchange_target_{connection.id}"
|
||||
|
||||
config = {
|
||||
"ENGINE": "django.db.backends.postgresql",
|
||||
"NAME": connection.database_name,
|
||||
"USER": connection.username,
|
||||
"PASSWORD": connection.password,
|
||||
"HOST": connection.server,
|
||||
"PORT": connection.port,
|
||||
"OPTIONS": {
|
||||
"options": f"-c search_path={connection.schema_name},public",
|
||||
},
|
||||
"CONN_MAX_AGE": 0,
|
||||
"ATOMIC_REQUESTS": False,
|
||||
"AUTOCOMMIT": True,
|
||||
"TIME_ZONE": None,
|
||||
"TEST": {},
|
||||
}
|
||||
|
||||
if alias in connections.databases:
|
||||
with suppress(Exception):
|
||||
connections[alias].close()
|
||||
|
||||
connections.databases[alias] = config
|
||||
|
||||
storage = getattr(connections, "_connections", None)
|
||||
if storage is not None and hasattr(storage, "__dict__"):
|
||||
storage.__dict__.pop(alias, None)
|
||||
|
||||
return alias
|
||||
|
||||
@classmethod
|
||||
def _validate_schema_exists(cls, *, alias: str, schema_name: str) -> None:
|
||||
with connections[alias].cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT 1
|
||||
FROM information_schema.schemata
|
||||
WHERE schema_name = %s
|
||||
""",
|
||||
[schema_name],
|
||||
)
|
||||
if cursor.fetchone() is None:
|
||||
raise ExchangeServiceError(
|
||||
f"Схема '{schema_name}' отсутствует в целевой БД"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _validate_tables_exist(
|
||||
cls,
|
||||
*,
|
||||
alias: str,
|
||||
schema_name: str,
|
||||
models_to_copy: list,
|
||||
) -> None:
|
||||
expected_tables = {model._meta.db_table for model in models_to_copy}
|
||||
with connections[alias].cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT table_name
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = %s
|
||||
""",
|
||||
[schema_name],
|
||||
)
|
||||
existing_tables = {row[0] for row in cursor.fetchall()}
|
||||
|
||||
missing_tables = sorted(expected_tables - existing_tables)
|
||||
if missing_tables:
|
||||
raise ExchangeServiceError(
|
||||
"В целевой БД отсутствуют таблицы: " + ", ".join(missing_tables)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _validate_columns_exist(
|
||||
cls,
|
||||
*,
|
||||
alias: str,
|
||||
schema_name: str,
|
||||
models_to_copy: list,
|
||||
) -> None:
|
||||
for model in models_to_copy:
|
||||
table_name = model._meta.db_table
|
||||
expected_columns = {field.column for field in model._meta.local_fields}
|
||||
|
||||
with connections[alias].cursor() as cursor:
|
||||
cursor.execute(
|
||||
"""
|
||||
SELECT column_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = %s AND table_name = %s
|
||||
""",
|
||||
[schema_name, table_name],
|
||||
)
|
||||
existing_columns = {row[0] for row in cursor.fetchall()}
|
||||
|
||||
missing_columns = sorted(expected_columns - existing_columns)
|
||||
if missing_columns:
|
||||
raise ExchangeServiceError(
|
||||
f"В таблице '{table_name}' отсутствуют колонки: "
|
||||
+ ", ".join(missing_columns)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _get_parser_models(cls) -> list:
|
||||
return [django_apps.get_model(label) for label in cls.PARSER_MODEL_LABELS]
|
||||
|
||||
@classmethod
|
||||
def _resolve_models(
|
||||
cls,
|
||||
*,
|
||||
mode: str,
|
||||
table: str | None,
|
||||
tables: list[str] | None,
|
||||
) -> list:
|
||||
parser_models = cls._get_parser_models()
|
||||
|
||||
if mode == "all":
|
||||
return parser_models
|
||||
|
||||
mapping: dict[str, Any] = {}
|
||||
for model in parser_models:
|
||||
mapping[model._meta.db_table] = model
|
||||
mapping[model._meta.model_name] = model
|
||||
mapping[model.__name__.lower()] = model
|
||||
|
||||
requested_names: list[str]
|
||||
if mode == "single":
|
||||
requested_names = [table] if table else []
|
||||
else:
|
||||
requested_names = tables or []
|
||||
|
||||
resolved_models = []
|
||||
for requested_name in requested_names:
|
||||
model = mapping.get(requested_name)
|
||||
if not model:
|
||||
available = ", ".join(sorted(m._meta.db_table for m in parser_models))
|
||||
raise ExchangeServiceError(
|
||||
f"Неизвестная таблица '{requested_name}'. Доступные: {available}"
|
||||
)
|
||||
resolved_models.append(model)
|
||||
|
||||
return resolved_models
|
||||
|
||||
@classmethod
|
||||
def _extend_models_with_dependencies(cls, models_to_copy: list) -> list:
|
||||
"""Добавить обязательные зависимые модели для корректного copy."""
|
||||
if not cls._requires_registry_organizations(models_to_copy):
|
||||
return models_to_copy
|
||||
|
||||
organization_model = django_apps.get_model("registers.Organization")
|
||||
ordered_models = [organization_model, *models_to_copy]
|
||||
|
||||
unique_models = []
|
||||
seen = set()
|
||||
for model in ordered_models:
|
||||
model_key = (model._meta.app_label, model._meta.model_name)
|
||||
if model_key in seen:
|
||||
continue
|
||||
seen.add(model_key)
|
||||
unique_models.append(model)
|
||||
|
||||
return unique_models
|
||||
|
||||
@classmethod
|
||||
def _requires_registry_organizations(cls, models_to_copy: list) -> bool:
|
||||
return any(
|
||||
any(field.name == "registry_organization" for field in model._meta.local_fields)
|
||||
for model in models_to_copy
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _truncate_tables(cls, *, alias: str, models_to_copy: list) -> None:
|
||||
with connections[alias].cursor() as cursor:
|
||||
for model in reversed(models_to_copy):
|
||||
cursor.execute(
|
||||
f'TRUNCATE TABLE "{model._meta.db_table}" RESTART IDENTITY CASCADE'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _copy_model_data(
|
||||
cls,
|
||||
*,
|
||||
model,
|
||||
alias: str,
|
||||
truncate_before_copy: bool,
|
||||
chunk_size: int = 1000,
|
||||
) -> int:
|
||||
field_names = [field.attname for field in model._meta.local_fields]
|
||||
queryset = model.objects.using("default").all().order_by("pk")
|
||||
|
||||
total_processed = 0
|
||||
batch = []
|
||||
pk_name = model._meta.pk.attname
|
||||
|
||||
for source_obj in queryset.iterator(chunk_size=chunk_size):
|
||||
row_data = {field_name: getattr(source_obj, field_name) for field_name in field_names}
|
||||
batch.append(model(**row_data))
|
||||
|
||||
if len(batch) >= chunk_size:
|
||||
total_processed += cls._insert_batch(
|
||||
model=model,
|
||||
alias=alias,
|
||||
batch=batch,
|
||||
pk_name=pk_name,
|
||||
chunk_size=chunk_size,
|
||||
truncate_before_copy=truncate_before_copy,
|
||||
)
|
||||
batch = []
|
||||
|
||||
if batch:
|
||||
total_processed += cls._insert_batch(
|
||||
model=model,
|
||||
alias=alias,
|
||||
batch=batch,
|
||||
pk_name=pk_name,
|
||||
chunk_size=chunk_size,
|
||||
truncate_before_copy=truncate_before_copy,
|
||||
)
|
||||
|
||||
return total_processed
|
||||
|
||||
@classmethod
|
||||
def _insert_batch(
|
||||
cls,
|
||||
*,
|
||||
model,
|
||||
alias: str,
|
||||
batch: list,
|
||||
pk_name: str,
|
||||
chunk_size: int,
|
||||
truncate_before_copy: bool,
|
||||
) -> int:
|
||||
if truncate_before_copy:
|
||||
model.objects.using(alias).bulk_create(
|
||||
batch,
|
||||
batch_size=chunk_size,
|
||||
ignore_conflicts=False,
|
||||
)
|
||||
return len(batch)
|
||||
|
||||
pk_values = [getattr(item, pk_name) for item in batch]
|
||||
existing_before = set(
|
||||
model.objects.using(alias)
|
||||
.filter(**{f"{pk_name}__in": pk_values})
|
||||
.values_list(pk_name, flat=True)
|
||||
)
|
||||
model.objects.using(alias).bulk_create(
|
||||
batch,
|
||||
batch_size=chunk_size,
|
||||
ignore_conflicts=True,
|
||||
)
|
||||
existing_after = set(
|
||||
model.objects.using(alias)
|
||||
.filter(**{f"{pk_name}__in": pk_values})
|
||||
.values_list(pk_name, flat=True)
|
||||
)
|
||||
return len(existing_after - existing_before)
|
||||
|
||||
@classmethod
|
||||
def _mark_connection_error(cls, connection: ExchangeConnection, error_message: str) -> None:
|
||||
connection.last_checked_at = timezone.now()
|
||||
connection.last_error = error_message
|
||||
connection.save(update_fields=["last_checked_at", "last_error", "updated_at"])
|
||||
Reference in New Issue
Block a user