603 lines
23 KiB
Python
603 lines
23 KiB
Python
from __future__ import annotations
|
||
|
||
from contextlib import suppress
|
||
from types import SimpleNamespace
|
||
from unittest.mock import MagicMock, patch
|
||
|
||
from apps.exchange.models import ExchangeConnection
|
||
from apps.exchange.services import ExchangeConnectionService, ExchangeServiceError
|
||
from apps.parsers.models import (
|
||
ManufacturerRecord,
|
||
ParserLoadLog,
|
||
)
|
||
from django.test import TestCase
|
||
|
||
from tests.apps.exchange.factories import ExchangeConnectionFactory
|
||
|
||
|
||
def _db_secret() -> str:
|
||
return "secret"
|
||
|
||
|
||
class _FakeModel:
|
||
_meta = SimpleNamespace(
|
||
app_label="tests",
|
||
model_name="fake_model",
|
||
db_table="fake_table",
|
||
local_fields=[
|
||
SimpleNamespace(attname="id", name="id", column="id"),
|
||
SimpleNamespace(attname="name", name="name", column="name"),
|
||
],
|
||
pk=SimpleNamespace(attname="id"),
|
||
)
|
||
objects = MagicMock()
|
||
|
||
def __init__(self, **kwargs):
|
||
for key, value in kwargs.items():
|
||
setattr(self, key, value)
|
||
|
||
|
||
class _AnotherFakeModel:
|
||
_meta = SimpleNamespace(
|
||
app_label="tests",
|
||
model_name="another_model",
|
||
db_table="another_table",
|
||
local_fields=[SimpleNamespace(attname="id", name="id", column="id")],
|
||
pk=SimpleNamespace(attname="id"),
|
||
)
|
||
|
||
|
||
class ExchangeConnectionServiceUnitTest(TestCase):
|
||
def test_create_active_connection_and_prepare_updates_last_check_on_success(self):
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"test_connection",
|
||
return_value="target_alias",
|
||
) as test_connection_mock, patch.object(
|
||
ExchangeConnectionService,
|
||
"validate_target_structure",
|
||
) as validate_mock:
|
||
connection = ExchangeConnectionService.create_active_connection_and_prepare(
|
||
server="127.0.0.1",
|
||
port=5432,
|
||
username="postgres",
|
||
password=_db_secret(),
|
||
database_name="target_db",
|
||
schema_name="public",
|
||
)
|
||
|
||
self.assertTrue(connection.is_active)
|
||
self.assertIsNotNone(connection.last_checked_at)
|
||
self.assertEqual(connection.last_error, "")
|
||
test_connection_mock.assert_called_once_with(connection)
|
||
validate_mock.assert_called_once_with(
|
||
connection=connection,
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
)
|
||
|
||
def test_test_connection_payload_does_not_persist_connection(self):
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"test_connection",
|
||
return_value="target_alias",
|
||
) as test_connection_mock, patch.object(
|
||
ExchangeConnectionService,
|
||
"validate_target_structure",
|
||
) as validate_mock, patch(
|
||
"apps.exchange.services.connections"
|
||
) as connections_mock:
|
||
connections_mock.databases = {"target_alias": {}}
|
||
result = ExchangeConnectionService.test_connection_payload(
|
||
server="127.0.0.1",
|
||
port=5432,
|
||
username="postgres",
|
||
password=_db_secret(),
|
||
database_name="target_db",
|
||
schema_name="public",
|
||
)
|
||
|
||
self.assertEqual(result["status"], "success")
|
||
self.assertEqual(
|
||
result["message"],
|
||
"Подключение проверено. Соединение и структура БД валидны.",
|
||
)
|
||
self.assertEqual(ExchangeConnection.objects.count(), 0)
|
||
test_connection_mock.assert_called_once()
|
||
validate_mock.assert_called_once()
|
||
|
||
def test_get_active_connection_raises_when_missing(self):
|
||
with self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"Активное подключение не найдено",
|
||
):
|
||
ExchangeConnectionService.get_active_connection()
|
||
|
||
def test_test_connection_success_runs_select(self):
|
||
connection = ExchangeConnectionFactory()
|
||
db_connection = MagicMock()
|
||
cursor = MagicMock()
|
||
db_connection.cursor.return_value.__enter__.return_value = cursor
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"_configure_alias",
|
||
return_value="exchange_target_1",
|
||
), patch("apps.exchange.services.connections", connections_mock):
|
||
alias = ExchangeConnectionService.test_connection(connection)
|
||
|
||
self.assertEqual(alias, "exchange_target_1")
|
||
db_connection.ensure_connection.assert_called_once_with()
|
||
cursor.execute.assert_called_once_with("SELECT 1")
|
||
|
||
def test_test_connection_marks_error_on_failure(self):
|
||
connection = ExchangeConnectionFactory(last_error="")
|
||
db_connection = MagicMock()
|
||
db_connection.ensure_connection.side_effect = RuntimeError("boom")
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"_configure_alias",
|
||
return_value="exchange_target_1",
|
||
), patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"Ошибка подключения к целевой БД: boom",
|
||
):
|
||
ExchangeConnectionService.test_connection(connection)
|
||
|
||
connection.refresh_from_db()
|
||
self.assertEqual(connection.last_error, "boom")
|
||
self.assertIsNotNone(connection.last_checked_at)
|
||
|
||
def test_test_connection_failure_for_unsaved_connection_does_not_try_to_save(self):
|
||
connection = ExchangeConnectionFactory.build(last_error="")
|
||
db_connection = MagicMock()
|
||
db_connection.ensure_connection.side_effect = RuntimeError("boom")
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"_configure_alias",
|
||
return_value="exchange_target_unsaved",
|
||
), patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"Ошибка подключения к целевой БД: boom",
|
||
):
|
||
ExchangeConnectionService.test_connection(connection)
|
||
|
||
self.assertEqual(connection.last_error, "boom")
|
||
self.assertIsNotNone(connection.last_checked_at)
|
||
|
||
def test_validate_target_structure_calls_all_validation_steps(self):
|
||
connection = ExchangeConnectionFactory()
|
||
db_connection = MagicMock()
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_extend_models_with_dependencies",
|
||
return_value=[_FakeModel],
|
||
) as extend_mock, patch.object(
|
||
ExchangeConnectionService,
|
||
"_get_parser_models",
|
||
return_value=[_FakeModel],
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_validate_schema_exists",
|
||
) as schema_mock, patch.object(
|
||
ExchangeConnectionService,
|
||
"_validate_tables_exist",
|
||
) as tables_mock, patch.object(
|
||
ExchangeConnectionService,
|
||
"_validate_columns_exist",
|
||
) as columns_mock:
|
||
ExchangeConnectionService.validate_target_structure(
|
||
connection=connection,
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
)
|
||
|
||
db_connection.ensure_connection.assert_called_once_with()
|
||
extend_mock.assert_called_once()
|
||
schema_mock.assert_called_once_with(alias="target_alias", schema_name="public")
|
||
tables_mock.assert_called_once_with(
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
models_to_copy=[_FakeModel],
|
||
)
|
||
columns_mock.assert_called_once_with(
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
models_to_copy=[_FakeModel],
|
||
)
|
||
|
||
def test_validate_target_structure_marks_and_reraises_exchange_error(self):
|
||
connection = ExchangeConnectionFactory(last_error="")
|
||
db_connection = MagicMock()
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_validate_schema_exists",
|
||
side_effect=ExchangeServiceError("bad schema"),
|
||
), self.assertRaisesMessage(ExchangeServiceError, "bad schema"):
|
||
ExchangeConnectionService.validate_target_structure(
|
||
connection=connection,
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
models_to_copy=[_FakeModel],
|
||
)
|
||
|
||
connection.refresh_from_db()
|
||
self.assertEqual(connection.last_error, "bad schema")
|
||
|
||
def test_validate_target_structure_wraps_generic_error(self):
|
||
connection = ExchangeConnectionFactory(last_error="")
|
||
db_connection = MagicMock()
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_validate_schema_exists",
|
||
side_effect=RuntimeError("unexpected"),
|
||
), self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"Ошибка проверки структуры целевой БД: unexpected",
|
||
):
|
||
ExchangeConnectionService.validate_target_structure(
|
||
connection=connection,
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
models_to_copy=[_FakeModel],
|
||
)
|
||
|
||
connection.refresh_from_db()
|
||
self.assertEqual(connection.last_error, "unexpected")
|
||
|
||
def test_copy_parsers_data_success(self):
|
||
connection = ExchangeConnectionFactory(schema_name="target_schema")
|
||
db_connection = MagicMock()
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_configure_alias",
|
||
return_value="target_alias",
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_resolve_models",
|
||
return_value=[_FakeModel, _AnotherFakeModel],
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_extend_models_with_dependencies",
|
||
return_value=[_FakeModel, _AnotherFakeModel],
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"validate_target_structure",
|
||
) as validate_mock, patch.object(
|
||
ExchangeConnectionService,
|
||
"_copy_model_data",
|
||
side_effect=[2, 3],
|
||
) as copy_mock:
|
||
result = ExchangeConnectionService.copy_parsers_data(
|
||
connection=connection,
|
||
mode="selected",
|
||
tables=["fake_table", "another_table"],
|
||
truncate_before_copy=False,
|
||
)
|
||
|
||
self.assertEqual(result["mode"], "selected")
|
||
self.assertEqual(result["tables"], ["fake_table", "another_table"])
|
||
self.assertEqual(result["rows_by_table"], {"fake_table": 2, "another_table": 3})
|
||
self.assertEqual(result["total_rows"], 5)
|
||
self.assertFalse(result["truncate_before_copy"])
|
||
validate_mock.assert_called_once_with(
|
||
connection=connection,
|
||
alias="target_alias",
|
||
schema_name="target_schema",
|
||
models_to_copy=[_FakeModel, _AnotherFakeModel],
|
||
)
|
||
self.assertEqual(copy_mock.call_count, 2)
|
||
connection.refresh_from_db()
|
||
self.assertEqual(connection.last_error, "")
|
||
self.assertIsNotNone(connection.last_checked_at)
|
||
|
||
def test_copy_parsers_data_marks_connection_error_on_connect_failure(self):
|
||
connection = ExchangeConnectionFactory(last_error="")
|
||
db_connection = MagicMock()
|
||
db_connection.ensure_connection.side_effect = RuntimeError("target unavailable")
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_configure_alias",
|
||
return_value="target_alias",
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_resolve_models",
|
||
return_value=[_FakeModel],
|
||
), patch.object(
|
||
ExchangeConnectionService,
|
||
"_extend_models_with_dependencies",
|
||
return_value=[_FakeModel],
|
||
), self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"Ошибка подключения к целевой БД: target unavailable",
|
||
):
|
||
ExchangeConnectionService.copy_parsers_data(
|
||
connection=connection,
|
||
mode="all",
|
||
)
|
||
|
||
connection.refresh_from_db()
|
||
self.assertEqual(connection.last_error, "target unavailable")
|
||
|
||
def test_configure_alias_closes_existing_connection_and_clears_cache(self):
|
||
connection = ExchangeConnectionFactory(password=_db_secret())
|
||
alias = f"exchange_target_{connection.id}"
|
||
existing_db_connection = MagicMock()
|
||
storage = SimpleNamespace(**{alias: "stale"})
|
||
connections_mock = MagicMock()
|
||
connections_mock.databases = {alias: {"ENGINE": "old"}}
|
||
connections_mock.__getitem__.return_value = existing_db_connection
|
||
connections_mock._connections = storage
|
||
|
||
with patch("apps.exchange.services.connections", connections_mock):
|
||
configured_alias = ExchangeConnectionService._configure_alias(connection)
|
||
|
||
self.assertEqual(configured_alias, alias)
|
||
existing_db_connection.close.assert_called_once_with()
|
||
self.assertEqual(
|
||
connections_mock.databases[alias]["NAME"], connection.database_name
|
||
)
|
||
self.assertEqual(connections_mock.databases[alias]["PASSWORD"], "secret")
|
||
self.assertNotIn(alias, storage.__dict__)
|
||
|
||
def test_validate_schema_exists_raises_when_schema_missing(self):
|
||
cursor = MagicMock()
|
||
cursor.fetchone.return_value = None
|
||
db_connection = MagicMock()
|
||
db_connection.cursor.return_value.__enter__.return_value = cursor
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"Схема 'public' отсутствует в целевой БД",
|
||
):
|
||
ExchangeConnectionService._validate_schema_exists(
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
)
|
||
|
||
def test_validate_tables_exist_raises_when_tables_missing(self):
|
||
cursor = MagicMock()
|
||
cursor.fetchall.return_value = [("fake_table",)]
|
||
db_connection = MagicMock()
|
||
db_connection.cursor.return_value.__enter__.return_value = cursor
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"В целевой БД отсутствуют таблицы: another_table",
|
||
):
|
||
ExchangeConnectionService._validate_tables_exist(
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
models_to_copy=[_FakeModel, _AnotherFakeModel],
|
||
)
|
||
|
||
def test_validate_columns_exist_raises_when_columns_missing(self):
|
||
cursor_context = MagicMock()
|
||
cursor = MagicMock()
|
||
cursor.fetchall.return_value = [("id",)]
|
||
cursor_context.__enter__.return_value = cursor
|
||
db_connection = MagicMock()
|
||
db_connection.cursor.return_value = cursor_context
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch(
|
||
"apps.exchange.services.connections", connections_mock
|
||
), self.assertRaisesMessage(
|
||
ExchangeServiceError,
|
||
"В таблице 'fake_table' отсутствуют колонки: name",
|
||
):
|
||
ExchangeConnectionService._validate_columns_exist(
|
||
alias="target_alias",
|
||
schema_name="public",
|
||
models_to_copy=[_FakeModel],
|
||
)
|
||
|
||
def test_get_parser_models_uses_configured_labels(self):
|
||
resolved_models = [
|
||
_FakeModel for _ in ExchangeConnectionService.PARSER_MODEL_LABELS
|
||
]
|
||
|
||
with patch(
|
||
"apps.exchange.services.django_apps.get_model",
|
||
side_effect=resolved_models,
|
||
) as get_model_mock:
|
||
result = ExchangeConnectionService._get_parser_models()
|
||
|
||
self.assertEqual(result, resolved_models)
|
||
self.assertEqual(
|
||
[call.args[0] for call in get_model_mock.call_args_list],
|
||
ExchangeConnectionService.PARSER_MODEL_LABELS,
|
||
)
|
||
|
||
def test_resolve_models_supports_table_and_class_names(self):
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"_get_parser_models",
|
||
return_value=[ParserLoadLog, ManufacturerRecord],
|
||
):
|
||
selected = ExchangeConnectionService._resolve_models(
|
||
mode="selected",
|
||
table=None,
|
||
tables=["parsers_load_log", "manufacturerrecord"],
|
||
)
|
||
|
||
self.assertEqual(selected, [ParserLoadLog, ManufacturerRecord])
|
||
|
||
def test_resolve_models_raises_on_unknown_table(self):
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"_get_parser_models",
|
||
return_value=[ParserLoadLog],
|
||
), self.assertRaisesMessage(ExchangeServiceError, "Неизвестная таблица"):
|
||
ExchangeConnectionService._resolve_models(
|
||
mode="single",
|
||
table="unknown_table",
|
||
tables=None,
|
||
)
|
||
|
||
def test_truncate_tables_executes_in_reverse_order(self):
|
||
cursor = MagicMock()
|
||
db_connection = MagicMock()
|
||
db_connection.cursor.return_value.__enter__.return_value = cursor
|
||
connections_mock = MagicMock()
|
||
connections_mock.__getitem__.return_value = db_connection
|
||
|
||
with patch("apps.exchange.services.connections", connections_mock):
|
||
ExchangeConnectionService._truncate_tables(
|
||
alias="target_alias",
|
||
models_to_copy=[_FakeModel, _AnotherFakeModel],
|
||
)
|
||
|
||
executed_sql = [call.args[0] for call in cursor.execute.call_args_list]
|
||
self.assertEqual(
|
||
executed_sql,
|
||
[
|
||
'TRUNCATE TABLE "another_table" RESTART IDENTITY CASCADE',
|
||
'TRUNCATE TABLE "fake_table" RESTART IDENTITY CASCADE',
|
||
],
|
||
)
|
||
|
||
def test_copy_model_data_splits_batches(self):
|
||
source_objects = [
|
||
SimpleNamespace(id=1, name="A"),
|
||
SimpleNamespace(id=2, name="B"),
|
||
SimpleNamespace(id=3, name="C"),
|
||
]
|
||
queryset = MagicMock()
|
||
queryset.order_by.return_value = queryset
|
||
queryset.iterator.return_value = source_objects
|
||
default_manager = MagicMock()
|
||
default_manager.all.return_value = queryset
|
||
_FakeModel.objects = MagicMock()
|
||
_FakeModel.objects.using.return_value = default_manager
|
||
|
||
with patch.object(
|
||
ExchangeConnectionService,
|
||
"_insert_batch",
|
||
side_effect=[2, 1],
|
||
) as insert_mock:
|
||
total = ExchangeConnectionService._copy_model_data(
|
||
model=_FakeModel,
|
||
alias="target_alias",
|
||
truncate_before_copy=True,
|
||
chunk_size=2,
|
||
)
|
||
|
||
self.assertEqual(total, 3)
|
||
self.assertEqual(insert_mock.call_count, 2)
|
||
|
||
def test_insert_batch_returns_batch_size_in_truncate_mode(self):
|
||
manager = MagicMock()
|
||
_FakeModel.objects = MagicMock()
|
||
_FakeModel.objects.using.return_value = manager
|
||
batch = [_FakeModel(id=1, name="A"), _FakeModel(id=2, name="B")]
|
||
|
||
created_count = ExchangeConnectionService._insert_batch(
|
||
model=_FakeModel,
|
||
alias="target_alias",
|
||
batch=batch,
|
||
pk_name="id",
|
||
chunk_size=100,
|
||
truncate_before_copy=True,
|
||
)
|
||
|
||
self.assertEqual(created_count, 2)
|
||
manager.bulk_create.assert_called_once_with(
|
||
batch,
|
||
batch_size=100,
|
||
ignore_conflicts=False,
|
||
)
|
||
|
||
def test_insert_batch_counts_only_new_rows_without_truncate(self):
|
||
manager = MagicMock()
|
||
manager.filter.side_effect = [
|
||
MagicMock(values_list=MagicMock(return_value=[1])),
|
||
MagicMock(values_list=MagicMock(return_value=[1, 2])),
|
||
]
|
||
_FakeModel.objects = MagicMock()
|
||
_FakeModel.objects.using.return_value = manager
|
||
batch = [_FakeModel(id=1, name="A"), _FakeModel(id=2, name="B")]
|
||
|
||
created_count = ExchangeConnectionService._insert_batch(
|
||
model=_FakeModel,
|
||
alias="target_alias",
|
||
batch=batch,
|
||
pk_name="id",
|
||
chunk_size=100,
|
||
truncate_before_copy=False,
|
||
)
|
||
|
||
self.assertEqual(created_count, 1)
|
||
manager.bulk_create.assert_called_once_with(
|
||
batch,
|
||
batch_size=100,
|
||
ignore_conflicts=True,
|
||
)
|
||
|
||
def test_mark_connection_error_updates_connection(self):
|
||
connection = ExchangeConnectionFactory(last_error="")
|
||
|
||
ExchangeConnectionService._mark_connection_error(connection, "broken")
|
||
|
||
connection.refresh_from_db()
|
||
self.assertEqual(connection.last_error, "broken")
|
||
self.assertIsNotNone(connection.last_checked_at)
|
||
|
||
def tearDown(self):
|
||
for alias in list(getattr(ExchangeConnectionService, "__dict__", {})):
|
||
if alias.startswith("exchange_target_"):
|
||
with suppress(Exception):
|
||
from django.db import connections
|
||
|
||
connections[alias].close()
|
||
with suppress(Exception):
|
||
from django.db import connections
|
||
|
||
connections.databases.pop(alias, None)
|