feat: expand platform APIs, sources, and test coverage
Some checks failed
CI/CD Pipeline / Run Tests (pull_request) Successful in 1m53s
CI/CD Pipeline / Telegram Notify Success (push) Has been cancelled
CI/CD Pipeline / Run Tests (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (push) Has been cancelled
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 2m54s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been skipped

This commit is contained in:
2026-03-17 12:56:48 +01:00
parent b505c67968
commit 3d298ce352
101 changed files with 8387 additions and 292 deletions

View File

@@ -1,8 +1,12 @@
"""Tests for exchange services."""
from contextlib import suppress
from apps.exchange.models import ExchangeConnection
from apps.exchange.services import ExchangeConnectionService
from apps.parsers.models import IndustrialCertificateRecord, ParserLoadLog
from apps.registers.models import Organization
from django.db import connections
from django.test import TestCase
@@ -23,3 +27,63 @@ class ExchangeConnectionServiceDependenciesTest(TestCase):
self.assertEqual(models_to_copy[0], Organization)
self.assertEqual(models_to_copy[1], IndustrialCertificateRecord)
class ExchangeConnectionEncryptionTest(TestCase):
def test_save_encrypts_password(self):
connection = ExchangeConnection.objects.create(
server="127.0.0.1",
port=5432,
username="postgres",
password="secret", # noqa: S106
database_name="target_db",
schema_name="public",
)
self.assertNotEqual(connection.password, "secret")
self.assertTrue(ExchangeConnection.is_password_encrypted(connection.password))
self.assertEqual(connection.get_decrypted_password(), "secret")
def test_save_encrypts_legacy_password_on_partial_update(self):
connection = ExchangeConnection.objects.create(
server="127.0.0.1",
port=5432,
username="postgres",
password="secret", # noqa: S106
database_name="target_db",
schema_name="public",
)
ExchangeConnection.objects.filter(id=connection.id).update(
password="legacy-pass" # noqa: S106
)
connection.refresh_from_db()
self.assertEqual(connection.password, "legacy-pass")
connection.last_error = "checked"
connection.save(update_fields=["last_error", "updated_at"])
connection.refresh_from_db()
self.assertNotEqual(connection.password, "legacy-pass")
self.assertEqual(connection.get_decrypted_password(), "legacy-pass")
def test_configure_alias_uses_decrypted_password(self):
connection = ExchangeConnection.objects.create(
server="127.0.0.1",
port=5432,
username="postgres",
password="secret", # noqa: S106
database_name="target_db",
schema_name="public",
)
alias = ExchangeConnectionService._configure_alias(connection)
try:
self.assertEqual(connections.databases[alias]["PASSWORD"], "secret")
finally:
with suppress(Exception):
connections[alias].close()
connections.databases.pop(alias, None)
storage = getattr(connections, "_connections", None)
if storage is not None and hasattr(storage, "__dict__"):
storage.__dict__.pop(alias, None)