Files
mostovik-backend/tests/test_api_inventory_e2e.py
Aleksandr Meshchriakov 4ca6b75393
Some checks failed
CI/CD Pipeline / Code Quality Checks (pull_request) Failing after 3m20s
CI/CD Pipeline / Run Tests (pull_request) Successful in 13m45s
CI/CD Pipeline / Run API Inventory E2E Tests (pull_request) Successful in 22s
CI/CD Pipeline / Telegram Notify Success (pull_request) Has been skipped
fix(api): align contracts with frontend md
2026-03-23 13:12:10 +01:00

719 lines
26 KiB
Python

"""End-to-end smoke coverage for the full HTTP API inventory."""
from __future__ import annotations
import io
from datetime import date, timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from unittest.mock import patch
from apps.backups.models import BackupExportJob
from apps.core.models import BackgroundJob
from apps.exchange.models import ExchangeConnection
from apps.parsers.models import (
FinancialReport,
FinancialReportLine,
ParserLoadLog,
ProcurementRecord,
)
from apps.user.services import UserService
from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse
from django.utils import timezone
from django_celery_beat.models import PeriodicTask
from openpyxl import Workbook
from rest_framework import status
from rest_framework.test import APITestCase
from tests.apps.parsers.factories import (
IndustrialCertificateRecordFactory,
IndustrialProductRecordFactory,
InspectionRecordFactory,
ManufacturerRecordFactory,
ParserLoadLogFactory,
)
from tests.apps.registers.factories import RegisterFactory
from tests.apps.user.factories import UserFactory
from tests.utils.fixtures import fake
def _digits(length: int) -> str:
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
def _build_fns_excel_bytes() -> bytes:
workbook = Workbook()
worksheet = workbook.active
year = fake.random_int(min=2020, max=2025)
worksheet.append(["Form", None, year, None])
worksheet.append([None, "Code", "Start", "End"])
worksheet.append(
[fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)]
)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _build_register_excel_bytes(rows: list[dict[str, str]]) -> bytes:
workbook = Workbook()
worksheet = workbook.active
worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "in_kpp", "mn_okpo"])
for row in rows:
worksheet.append(
[
row["pn_name"],
row["mn_ogrn"],
row["mn_inn"],
row["in_kpp"],
row["mn_okpo"],
]
)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _extract_results(response_data):
if hasattr(response_data, "get"):
data = response_data.get("data")
if isinstance(data, list):
return data
results = response_data.get("results")
if results is not None:
return results
return response_data
def _create_procurement_record() -> ProcurementRecord:
return ProcurementRecord.objects.create(
load_batch=fake.random_int(min=1, max=1000),
purchase_number=_digits(19),
purchase_name=fake.sentence(nb_words=6),
customer_inn=_digits(10),
customer_kpp=_digits(9),
customer_ogrn=_digits(13),
customer_name=fake.company(),
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
status=fake.word(),
law_type="44-FZ",
href=fake.url(),
region_code=f"{fake.random_int(min=1, max=99):02d}",
)
class AuthenticatedApiMixin:
def authenticate(self, user):
tokens = UserService.get_tokens_for_user(user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
return tokens
class PublicApiInventoryE2ETest(APITestCase):
def test_health_and_swagger_endpoints(self):
swagger_response = self.client.get(reverse("schema-swagger-ui"))
health_response = self.client.get(reverse("core:health"))
live_response = self.client.get(reverse("core:liveness"))
ready_response = self.client.get(reverse("core:readiness"))
self.assertEqual(swagger_response.status_code, status.HTTP_200_OK)
self.assertEqual(health_response.status_code, status.HTTP_200_OK)
self.assertEqual(live_response.status_code, status.HTTP_200_OK)
self.assertEqual(ready_response.status_code, status.HTTP_200_OK)
class UserApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
def test_auth_and_profile_endpoints(self):
initial_password = fake.password(length=16, special_chars=False)
new_password = fake.password(length=18, special_chars=False)
register_payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"password": initial_password,
"password_confirm": initial_password,
"phone": f"+7{fake.numerify('##########')}",
"first_name": "Ivan",
"middle_name": "Ivanovich",
"last_name": "Ivanov",
}
register_response = self.client.post(
reverse("api_v1:user:register"),
register_payload,
format="json",
)
self.assertEqual(register_response.status_code, status.HTTP_201_CREATED)
self.assertIn("tokens", register_response.data)
login_response = self.client.post(
reverse("api_v1:user:login"),
{
"username": register_payload["username"],
"password": initial_password,
},
format="json",
)
self.assertEqual(login_response.status_code, status.HTTP_200_OK)
verify_response = self.client.post(
reverse("api_v1:user:token_verify"),
{"token": login_response.data["access"]},
format="json",
)
self.assertEqual(verify_response.status_code, status.HTTP_200_OK)
refresh_response = self.client.post(
reverse("api_v1:user:token_refresh"),
{"refresh": login_response.data["refresh"]},
format="json",
)
self.assertEqual(refresh_response.status_code, status.HTTP_200_OK)
self.client.credentials(
HTTP_AUTHORIZATION=f"Bearer {refresh_response.data['access']}"
)
me_response = self.client.get(reverse("api_v1:user:current_user"))
self.assertEqual(me_response.status_code, status.HTTP_200_OK)
self.assertEqual(me_response.data["username"], register_payload["username"])
me_update_response = self.client.patch(
reverse("api_v1:user:user_update"),
{"phone": f"+7{fake.numerify('##########')}"},
format="json",
)
self.assertEqual(me_update_response.status_code, status.HTTP_200_OK)
profile_response = self.client.get(reverse("api_v1:user:profile_detail"))
self.assertEqual(profile_response.status_code, status.HTTP_200_OK)
profile_patch_response = self.client.patch(
reverse("api_v1:user:profile_detail"),
{
"first_name": "Petr",
"middle_name": "Petrovich",
"last_name": "Petrov",
},
format="json",
)
self.assertEqual(profile_patch_response.status_code, status.HTTP_200_OK)
profile_full_response = self.client.get(reverse("api_v1:user:profile_full"))
self.assertEqual(profile_full_response.status_code, status.HTTP_200_OK)
self.assertEqual(
profile_full_response.data["username"], register_payload["username"]
)
password_change_response = self.client.post(
reverse("api_v1:user:password_change"),
{
"old_password": initial_password,
"new_password": new_password,
"new_password_confirm": new_password,
},
format="json",
)
self.assertEqual(password_change_response.status_code, status.HTTP_200_OK)
relogin_response = self.client.post(
reverse("api_v1:user:login"),
{
"username": register_payload["username"],
"password": new_password,
},
format="json",
)
self.assertEqual(relogin_response.status_code, status.HTTP_200_OK)
self.client.credentials(
HTTP_AUTHORIZATION=f"Bearer {relogin_response.data['access']}"
)
logout_response = self.client.post(
reverse("api_v1:user:logout"),
{},
format="json",
)
self.assertEqual(logout_response.status_code, status.HTTP_200_OK)
def test_admin_user_management_endpoints(self):
self.authenticate(self.admin)
list_response = self.client.get(reverse("api_v1:user:admin-users"))
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
self.assertEqual(
set(list_response.data.keys()),
{"count", "next", "previous", "results"},
)
self.assertTrue(list_response.data["results"])
self.assertIn("profile", list_response.data["results"][0])
create_payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
"password": "AdminManagedPass123",
"role": "user",
"first_name": "Alex",
"middle_name": "Alexeevich",
"last_name": "Alexeev",
}
create_response = self.client.post(
reverse("api_v1:user:admin-users"),
create_payload,
format="json",
)
self.assertEqual(create_response.status_code, status.HTTP_201_CREATED)
managed_user_id = create_response.data["id"]
detail_url = reverse("api_v1:user:admin-user-detail", args=[managed_user_id])
detail_response = self.client.get(detail_url)
self.assertEqual(detail_response.status_code, status.HTTP_200_OK)
patch_response = self.client.patch(
detail_url,
{
"role": "admin",
"first_name": "Sergey",
"middle_name": "Sergeevich",
"last_name": "Sergeev",
},
format="json",
)
self.assertEqual(patch_response.status_code, status.HTTP_200_OK)
self.assertEqual(patch_response.data["role"], "admin")
deactivate_response = self.client.post(
reverse("api_v1:user:admin-user-deactivate", args=[managed_user_id]),
{},
format="json",
)
self.assertEqual(deactivate_response.status_code, status.HTTP_200_OK)
activate_response = self.client.post(
reverse("api_v1:user:admin-user-activate", args=[managed_user_id]),
{},
format="json",
)
self.assertEqual(activate_response.status_code, status.HTTP_200_OK)
self.assertTrue(activate_response.data["is_active"])
class JobsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
def _create_job(self, *, task_id: str, status_value: str) -> BackgroundJob:
started_at = timezone.now()
completed_at = started_at + timedelta(seconds=3)
return BackgroundJob.objects.create(
task_id=task_id,
task_name="apps.parsers.tasks.fake_task",
status=status_value,
user_id=self.user.id,
started_at=started_at,
completed_at=completed_at,
progress=100,
)
def test_jobs_endpoints(self):
job = self._create_job(task_id="inventory-job-1", status_value="success")
self.authenticate(self.user)
list_response = self.client.get(reverse("api_v1:jobs:job-list"))
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
self.assertEqual(list_response.data["results"][0]["task_id"], job.task_id)
status_response = self.client.get(
reverse("api_v1:jobs:job-status", kwargs={"task_id": job.task_id})
)
self.assertEqual(status_response.status_code, status.HTTP_200_OK)
self.assertEqual(status_response.data["status"], "success")
stream_response = self.client.get(
reverse("api_v1:jobs:job-stream", kwargs={"task_id": job.task_id})
)
self.assertEqual(stream_response.status_code, status.HTTP_200_OK)
stream_chunks = b"".join(stream_response.streaming_content).decode("utf-8")
self.assertIn("event: completed", stream_chunks)
self.assertIn(job.task_id, stream_chunks)
class ParsersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_superuser()
def _create_financial_report(self) -> FinancialReport:
report = FinancialReport.objects.create(
external_id=_digits(5),
ogrn=_digits(13),
file_name=f"fin_{_digits(5)}_{_digits(13)}.xlsx",
file_hash=fake.sha256(raw_output=False),
load_batch=fake.random_int(min=1, max=1000),
status=FinancialReport.Status.SUCCESS,
source=FinancialReport.SourceType.API,
)
FinancialReportLine.objects.create(
report=report,
form_code="1",
line_code="1100",
line_name="Assets",
year=2025,
period_start=100,
period_end=200,
)
return report
def test_registry_read_endpoints_for_parsers_data(self):
certificate = IndustrialCertificateRecordFactory()
manufacturer = ManufacturerRecordFactory()
product = IndustrialProductRecordFactory()
inspection = InspectionRecordFactory()
procurement = _create_procurement_record()
report = self._create_financial_report()
self.authenticate(self.user)
cert_list = self.client.get(reverse("api_v1:minpromtorg:certificates-list"))
cert_detail = self.client.get(
reverse("api_v1:minpromtorg:certificates-detail", args=[certificate.id])
)
manufacturers_list = self.client.get(
reverse("api_v1:minpromtorg:manufacturers-list")
)
manufacturer_detail = self.client.get(
reverse("api_v1:minpromtorg:manufacturers-detail", args=[manufacturer.id])
)
products_list = self.client.get(
reverse("api_v1:minpromtorg:industrial-products-list")
)
product_detail = self.client.get(
reverse("api_v1:minpromtorg:industrial-products-detail", args=[product.id])
)
inspections_list = self.client.get(reverse("api_v1:proverki:inspections-list"))
inspection_detail = self.client.get(
reverse("api_v1:proverki:inspections-detail", args=[inspection.id])
)
procurements_list = self.client.get(reverse("api_v1:zakupki:procurements-list"))
procurement_detail = self.client.get(
reverse("api_v1:zakupki:procurements-detail", args=[procurement.id])
)
fns_reports_list = self.client.get(reverse("api_v1:fns:fns-reports-list"))
fns_report_detail = self.client.get(
reverse("api_v1:fns:fns-reports-detail", args=[report.id])
)
for response in (
cert_list,
cert_detail,
manufacturers_list,
manufacturer_detail,
products_list,
product_detail,
inspections_list,
inspection_detail,
procurements_list,
procurement_detail,
fns_reports_list,
fns_report_detail,
):
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_sources_parsing_and_system_endpoints(self):
shared_inn = _digits(10)
IndustrialCertificateRecordFactory(inn=shared_inn)
IndustrialProductRecordFactory(inn=shared_inn)
ManufacturerRecordFactory(load_batch=777, inn=shared_inn)
ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL,
status="success",
records_count=1,
)
ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
status="success",
records_count=1,
)
log = ParserLoadLogFactory(
source=ParserLoadLog.Source.MANUFACTURES,
batch_id=777,
status="success",
records_count=1,
)
self.authenticate(self.user)
sources_list = self.client.get(reverse("api_v1:sources:source-cards-list"))
sources_statuses = self.client.get(
reverse("api_v1:sources:source-cards-statuses")
)
source_detail = self.client.get(
reverse(
"api_v1:sources:source-cards-detail",
kwargs={"slug": "manufacturers-and-products"},
)
)
self.assertEqual(sources_list.status_code, status.HTTP_200_OK)
self.assertEqual(sources_statuses.status_code, status.HTTP_200_OK)
self.assertEqual(source_detail.status_code, status.HTTP_200_OK)
self.authenticate(self.admin)
parsing_url = reverse("api_v1:parsing:parsing-settings")
parsing_get = self.client.get(parsing_url)
parsing_patch = self.client.patch(
parsing_url,
{"planned_inspections": "weekly"},
format="json",
)
logs_list = self.client.get(reverse("api_v1:system:parser-logs-list"))
logs_detail = self.client.get(
reverse("api_v1:system:parser-logs-detail", args=[log.id])
)
logs_export = self.client.get(reverse("api_v1:system:parser-logs-export"))
with TemporaryDirectory() as tmp_dir:
base = Path(tmp_dir)
refresh_response = self.client.post(
reverse(
"api_v1:sources:source-cards-refresh",
kwargs={"slug": "financial-indicators"},
),
{},
format="json",
)
upload = SimpleUploadedFile(
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
_build_fns_excel_bytes(),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with self.settings(
FNS_WATCH_DIRECTORY=str(base / "watch"),
FNS_PROCESSED_DIRECTORY=str(base / "processed"),
FNS_FAILED_DIRECTORY=str(base / "failed"),
):
fns_upload = self.client.post(
reverse("api_v1:fns:fns-upload"),
{"file": upload},
format="multipart",
)
for response in (
parsing_get,
parsing_patch,
logs_list,
logs_detail,
logs_export,
refresh_response,
fns_upload,
):
self.assertIn(
response.status_code,
{
status.HTTP_200_OK,
status.HTTP_202_ACCEPTED,
},
)
class RegistersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_superuser()
def test_registers_endpoints(self):
registry = RegisterFactory(name="Inventory Registry")
rows = [
{
"pn_name": "Inventory Org",
"mn_ogrn": "1027600980990",
"mn_inn": "7601000086",
"in_kpp": "760401001",
"mn_okpo": "07506197",
}
]
self.authenticate(self.admin)
upload = SimpleUploadedFile(
"inventory-registry.xlsx",
_build_register_excel_bytes(rows),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
upload_response = self.client.post(
reverse("api_v1:registers:register-upload"),
{
"registry": str(registry.id),
"actual_date": date(2026, 1, 1).isoformat(),
"file": upload,
},
format="multipart",
)
self.assertEqual(upload_response.status_code, status.HTTP_201_CREATED)
self.authenticate(self.user)
registries_list = self.client.get(reverse("api_v1:registers:registries-list"))
registries_detail = self.client.get(
reverse("api_v1:registers:registries-detail", args=[registry.id])
)
organizations_list = self.client.get(
reverse("api_v1:registers:organizations-list")
)
organization_id = _extract_results(organizations_list.data)[0]["id"]
organization_detail = self.client.get(
reverse("api_v1:registers:organizations-detail", args=[organization_id])
)
registry_organizations = self.client.get(
reverse(
"api_v1:registers:registry-organizations-list",
args=[registry.id],
)
)
for response in (
registries_list,
registries_detail,
organizations_list,
organization_detail,
registry_organizations,
):
self.assertEqual(response.status_code, status.HTTP_200_OK)
class ExchangeApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
@patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure")
@patch("apps.exchange.services.ExchangeConnectionService.test_connection")
@patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload")
@patch("apps.exchange.views.copy_parsers_data_async.delay")
@patch("apps.exchange.services.ExchangeConnectionService.get_active_connection")
def test_exchange_endpoints(
self,
get_active_connection_mock,
delay_mock,
test_connection_payload_mock,
_test_connection_mock,
_validate_mock,
):
self.authenticate(self.admin)
connections_url = reverse("api_v1:exchange:connections")
test_connection_url = reverse("api_v1:exchange:connections-test")
copy_url = reverse("api_v1:exchange:copy")
periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks")
connection_payload = {
"server": "127.0.0.1",
"port": 5432,
"username": "postgres",
"password": "secret",
"database_name": "target_db",
"schema_name": "public",
}
test_connection_payload_mock.return_value = {
"status": "success",
"message": "ok",
}
list_connections = self.client.get(connections_url)
create_connection = self.client.post(
connections_url,
connection_payload,
format="json",
)
connection_test = self.client.post(
test_connection_url,
connection_payload,
format="json",
)
active_connection = ExchangeConnection.objects.get(
id=create_connection.data["id"]
)
get_active_connection_mock.return_value = active_connection
delay_mock.return_value = SimpleNamespace(id="exchange-task-1")
copy_response = self.client.post(copy_url, {"mode": "all"}, format="json")
list_periodic = self.client.get(periodic_tasks_url)
create_periodic = self.client.post(
periodic_tasks_url,
{
"schedule_type": "interval",
"interval_every": 1,
"interval_period": "hours",
"notify_on_error": True,
},
format="json",
)
periodic_id = create_periodic.data["id"]
periodic_detail_url = reverse(
"api_v1:exchange:periodic-task-detail",
args=[periodic_id],
)
detail_periodic = self.client.get(periodic_detail_url)
patch_periodic = self.client.patch(
periodic_detail_url,
{
"schedule_type": "daily",
"crontab_hour": 2,
"crontab_minute": 30,
"notify_on_error": False,
},
format="json",
)
for response in (
list_connections,
create_connection,
connection_test,
copy_response,
list_periodic,
create_periodic,
detail_periodic,
patch_periodic,
):
self.assertIn(
response.status_code,
{
status.HTTP_200_OK,
status.HTTP_201_CREATED,
status.HTTP_202_ACCEPTED,
},
)
self.assertTrue(PeriodicTask.objects.filter(id=periodic_id).exists())
self.assertTrue(
ExchangeConnection.objects.filter(id=active_connection.id).exists()
)
class BackupsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
@patch("apps.backups.services.BackupExportJobService._enqueue_backup_task")
def test_backup_export_endpoint(self, enqueue_mock):
self.authenticate(self.admin)
today = timezone.localdate()
with self.captureOnCommitCallbacks(execute=True):
response = self.client.post(
reverse("api_v1:backups:export"),
{"actual_date": today.isoformat()},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
self.assertEqual(response.data["status"], "started")
self.assertTrue(BackupExportJob.objects.filter(actual_date=today).exists())
enqueue_mock.assert_called_once()