Files
mostovik-backend/tests/test_api_inventory_e2e.py
Aleksandr Meshchriakov 75c1d4cf1a
Some checks failed
CI/CD Pipeline / Quality Gate (push) Successful in 33s
CI/CD Pipeline / Build and Push Images (push) Successful in 10s
CI/CD Pipeline / Internal Notify (push) Successful in 0s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Failing after 9s
feat: export state corp package from backup endpoint
2026-05-12 15:12:56 +02:00

736 lines
26 KiB
Python

"""End-to-end smoke coverage for the full HTTP API inventory."""
from __future__ import annotations
import io
from datetime import date, timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from unittest.mock import patch
from apps.core.models import BackgroundJob
from apps.exchange.models import ExchangeConnection
from apps.parsers.models import (
FinancialReport,
FinancialReportLine,
ParserLoadLog,
ProcurementRecord,
)
from apps.user.services import UserService
from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse
from django.utils import timezone
from django_celery_beat.models import PeriodicTask
from openpyxl import Workbook
from rest_framework import status
from rest_framework.test import APITestCase
from tests.apps.parsers.factories import (
IndustrialCertificateRecordFactory,
IndustrialProductRecordFactory,
InspectionRecordFactory,
ManufacturerRecordFactory,
ParserLoadLogFactory,
)
from tests.apps.registers.factories import RegisterFactory
from tests.apps.user.factories import UserFactory
from tests.utils.fixtures import fake
def _digits(length: int) -> str:
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
def _build_fns_excel_bytes() -> bytes:
workbook = Workbook()
worksheet = workbook.active
year = fake.random_int(min=2020, max=2025)
worksheet.append(["Form", None, year, None])
worksheet.append([None, "Code", "Start", "End"])
worksheet.append(
[fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)]
)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _build_register_excel_bytes(rows: list[dict[str, str]]) -> bytes:
workbook = Workbook()
worksheet = workbook.active
worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "in_kpp", "mn_okpo"])
for row in rows:
worksheet.append(
[
row["pn_name"],
row["mn_ogrn"],
row["mn_inn"],
row["in_kpp"],
row["mn_okpo"],
]
)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _extract_results(response_data):
if hasattr(response_data, "get"):
data = response_data.get("data")
if isinstance(data, list):
return data
results = response_data.get("results")
if results is not None:
return results
return response_data
def _create_procurement_record() -> ProcurementRecord:
return ProcurementRecord.objects.create(
load_batch=fake.random_int(min=1, max=1000),
purchase_number=_digits(19),
purchase_name=fake.sentence(nb_words=6),
customer_inn=_digits(10),
customer_kpp=_digits(9),
customer_ogrn=_digits(13),
customer_name=fake.company(),
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
status=fake.word(),
law_type="44-FZ",
href=fake.url(),
region_code=f"{fake.random_int(min=1, max=99):02d}",
)
class AuthenticatedApiMixin:
def authenticate(self, user):
tokens = UserService.get_tokens_for_user(user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
return tokens
class PublicApiInventoryE2ETest(APITestCase):
def test_health_and_swagger_endpoints(self):
swagger_response = self.client.get(reverse("schema-swagger-ui"))
health_response = self.client.get(reverse("core:health"))
live_response = self.client.get(reverse("core:liveness"))
ready_response = self.client.get(reverse("core:readiness"))
openapi_response = self.client.get(
reverse("schema-swagger-ui"),
{"format": "openapi"},
)
self.assertEqual(swagger_response.status_code, status.HTTP_200_OK)
self.assertEqual(openapi_response.status_code, status.HTTP_200_OK)
self.assertEqual(health_response.status_code, status.HTTP_200_OK)
self.assertEqual(live_response.status_code, status.HTTP_200_OK)
self.assertEqual(ready_response.status_code, status.HTTP_200_OK)
class UserApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
def test_auth_and_profile_endpoints(self):
initial_password = fake.password(length=16, special_chars=False)
new_password = fake.password(length=18, special_chars=False)
register_payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"password": initial_password,
"password_confirm": initial_password,
"phone": f"+7{fake.numerify('##########')}",
"first_name": "Ivan",
"middle_name": "Ivanovich",
"last_name": "Ivanov",
}
register_response = self.client.post(
reverse("api_v1:user:register"),
register_payload,
format="json",
)
self.assertEqual(register_response.status_code, status.HTTP_201_CREATED)
self.assertIn("tokens", register_response.data)
login_response = self.client.post(
reverse("api_v1:user:login"),
{
"username": register_payload["username"],
"password": initial_password,
},
format="json",
)
self.assertEqual(login_response.status_code, status.HTTP_200_OK)
verify_response = self.client.post(
reverse("api_v1:user:token_verify"),
{"token": login_response.data["access"]},
format="json",
)
self.assertEqual(verify_response.status_code, status.HTTP_200_OK)
refresh_response = self.client.post(
reverse("api_v1:user:token_refresh"),
{"refresh": login_response.data["refresh"]},
format="json",
)
self.assertEqual(refresh_response.status_code, status.HTTP_200_OK)
self.client.credentials(
HTTP_AUTHORIZATION=f"Bearer {refresh_response.data['access']}"
)
me_response = self.client.get(reverse("api_v1:user:current_user"))
self.assertEqual(me_response.status_code, status.HTTP_200_OK)
self.assertEqual(me_response.data["username"], register_payload["username"])
me_update_response = self.client.patch(
reverse("api_v1:user:user_update"),
{"phone": f"+7{fake.numerify('##########')}"},
format="json",
)
self.assertEqual(me_update_response.status_code, status.HTTP_200_OK)
profile_response = self.client.get(reverse("api_v1:user:profile_detail"))
self.assertEqual(profile_response.status_code, status.HTTP_200_OK)
profile_patch_response = self.client.patch(
reverse("api_v1:user:profile_detail"),
{
"first_name": "Petr",
"middle_name": "Petrovich",
"last_name": "Petrov",
},
format="json",
)
self.assertEqual(profile_patch_response.status_code, status.HTTP_200_OK)
profile_full_response = self.client.get(reverse("api_v1:user:profile_full"))
self.assertEqual(profile_full_response.status_code, status.HTTP_200_OK)
self.assertEqual(
profile_full_response.data["username"], register_payload["username"]
)
password_change_response = self.client.post(
reverse("api_v1:user:password_change"),
{
"old_password": initial_password,
"new_password": new_password,
"new_password_confirm": new_password,
},
format="json",
)
self.assertEqual(password_change_response.status_code, status.HTTP_200_OK)
relogin_response = self.client.post(
reverse("api_v1:user:login"),
{
"username": register_payload["username"],
"password": new_password,
},
format="json",
)
self.assertEqual(relogin_response.status_code, status.HTTP_200_OK)
self.client.credentials(
HTTP_AUTHORIZATION=f"Bearer {relogin_response.data['access']}"
)
logout_response = self.client.post(
reverse("api_v1:user:logout"),
{},
format="json",
)
self.assertEqual(logout_response.status_code, status.HTTP_200_OK)
def test_admin_user_management_endpoints(self):
self.authenticate(self.admin)
list_response = self.client.get(reverse("api_v1:user:admin-users"))
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
self.assertEqual(
set(list_response.data.keys()),
{"count", "next", "previous", "results"},
)
self.assertTrue(list_response.data["results"])
self.assertIn("profile", list_response.data["results"][0])
create_payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
"password": "AdminManagedPass123",
"role": "user",
"first_name": "Alex",
"middle_name": "Alexeevich",
"last_name": "Alexeev",
}
create_response = self.client.post(
reverse("api_v1:user:admin-users"),
create_payload,
format="json",
)
self.assertEqual(create_response.status_code, status.HTTP_201_CREATED)
managed_user_id = create_response.data["id"]
detail_url = reverse("api_v1:user:admin-user-detail", args=[managed_user_id])
detail_response = self.client.get(detail_url)
self.assertEqual(detail_response.status_code, status.HTTP_200_OK)
patch_response = self.client.patch(
detail_url,
{
"role": "admin",
"first_name": "Sergey",
"middle_name": "Sergeevich",
"last_name": "Sergeev",
},
format="json",
)
self.assertEqual(patch_response.status_code, status.HTTP_200_OK)
self.assertEqual(patch_response.data["role"], "admin")
deactivate_response = self.client.post(
reverse("api_v1:user:admin-user-deactivate", args=[managed_user_id]),
{},
format="json",
)
self.assertEqual(deactivate_response.status_code, status.HTTP_200_OK)
activate_response = self.client.post(
reverse("api_v1:user:admin-user-activate", args=[managed_user_id]),
{},
format="json",
)
self.assertEqual(activate_response.status_code, status.HTTP_200_OK)
self.assertTrue(activate_response.data["is_active"])
class JobsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
def _create_job(self, *, task_id: str, status_value: str) -> BackgroundJob:
started_at = timezone.now()
completed_at = started_at + timedelta(seconds=3)
return BackgroundJob.objects.create(
task_id=task_id,
task_name="apps.parsers.tasks.fake_task",
status=status_value,
user_id=self.user.id,
started_at=started_at,
completed_at=completed_at,
progress=100,
)
def test_jobs_endpoints(self):
job = self._create_job(task_id="inventory-job-1", status_value="success")
self.authenticate(self.user)
list_response = self.client.get(reverse("api_v1:jobs:job-list"))
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
self.assertEqual(list_response.data["results"][0]["task_id"], job.task_id)
status_response = self.client.get(
reverse("api_v1:jobs:job-status", kwargs={"task_id": job.task_id})
)
self.assertEqual(status_response.status_code, status.HTTP_200_OK)
self.assertEqual(status_response.data["status"], "success")
stream_response = self.client.get(
reverse("api_v1:jobs:job-stream", kwargs={"task_id": job.task_id})
)
self.assertEqual(stream_response.status_code, status.HTTP_200_OK)
stream_chunks = b"".join(stream_response.streaming_content).decode("utf-8")
self.assertIn("event: completed", stream_chunks)
self.assertIn(job.task_id, stream_chunks)
class ParsersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_superuser()
def _create_financial_report(self) -> FinancialReport:
report = FinancialReport.objects.create(
external_id=_digits(5),
ogrn=_digits(13),
file_name=f"fin_{_digits(5)}_{_digits(13)}.xlsx",
file_hash=fake.sha256(raw_output=False),
load_batch=fake.random_int(min=1, max=1000),
status=FinancialReport.Status.SUCCESS,
source=FinancialReport.SourceType.API,
)
FinancialReportLine.objects.create(
report=report,
form_code="1",
line_code="1100",
line_name="Assets",
year=2025,
period_start=100,
period_end=200,
)
return report
def test_registry_read_endpoints_for_parsers_data(self):
certificate = IndustrialCertificateRecordFactory()
manufacturer = ManufacturerRecordFactory()
product = IndustrialProductRecordFactory()
inspection = InspectionRecordFactory()
procurement = _create_procurement_record()
report = self._create_financial_report()
self.authenticate(self.user)
cert_list = self.client.get(reverse("api_v1:minpromtorg:certificates-list"))
cert_detail = self.client.get(
reverse("api_v1:minpromtorg:certificates-detail", args=[certificate.id])
)
manufacturers_list = self.client.get(
reverse("api_v1:minpromtorg:manufacturers-list")
)
manufacturer_detail = self.client.get(
reverse("api_v1:minpromtorg:manufacturers-detail", args=[manufacturer.id])
)
products_list = self.client.get(
reverse("api_v1:minpromtorg:industrial-products-list")
)
product_detail = self.client.get(
reverse("api_v1:minpromtorg:industrial-products-detail", args=[product.id])
)
inspections_list = self.client.get(reverse("api_v1:proverki:inspections-list"))
inspection_detail = self.client.get(
reverse("api_v1:proverki:inspections-detail", args=[inspection.id])
)
procurements_list = self.client.get(reverse("api_v1:zakupki:procurements-list"))
procurement_detail = self.client.get(
reverse("api_v1:zakupki:procurements-detail", args=[procurement.id])
)
fns_reports_list = self.client.get(reverse("api_v1:fns:fns-reports-list"))
fns_report_detail = self.client.get(
reverse("api_v1:fns:fns-reports-detail", args=[report.id])
)
for response in (
cert_list,
cert_detail,
manufacturers_list,
manufacturer_detail,
products_list,
product_detail,
inspections_list,
inspection_detail,
procurements_list,
procurement_detail,
fns_reports_list,
fns_report_detail,
):
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_sources_parsing_and_system_endpoints(self):
shared_inn = _digits(10)
IndustrialCertificateRecordFactory(inn=shared_inn)
IndustrialProductRecordFactory(inn=shared_inn)
ManufacturerRecordFactory(load_batch=777, inn=shared_inn)
ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL,
status="success",
records_count=1,
)
ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
status="success",
records_count=1,
)
log = ParserLoadLogFactory(
source=ParserLoadLog.Source.MANUFACTURES,
batch_id=777,
status="success",
records_count=1,
)
self.authenticate(self.user)
sources_list = self.client.get(reverse("api_v1:sources:source-cards-list"))
sources_statuses = self.client.get(
reverse("api_v1:sources:source-cards-statuses")
)
source_detail = self.client.get(
reverse(
"api_v1:sources:source-cards-detail",
kwargs={"slug": "manufacturers-and-products"},
)
)
self.assertEqual(sources_list.status_code, status.HTTP_200_OK)
self.assertEqual(sources_statuses.status_code, status.HTTP_200_OK)
self.assertEqual(source_detail.status_code, status.HTTP_200_OK)
self.authenticate(self.admin)
parsing_url = reverse("api_v1:parsing:parsing-settings")
parsing_get = self.client.get(parsing_url)
parsing_patch = self.client.patch(
parsing_url,
{"planned_inspections": "weekly"},
format="json",
)
logs_list = self.client.get(reverse("api_v1:system:parser-logs-list"))
logs_detail = self.client.get(
reverse("api_v1:system:parser-logs-detail", args=[log.id])
)
logs_export = self.client.get(reverse("api_v1:system:parser-logs-export"))
with TemporaryDirectory() as tmp_dir:
base = Path(tmp_dir)
refresh_response = self.client.post(
reverse(
"api_v1:sources:source-cards-refresh",
kwargs={"slug": "financial-indicators"},
),
{},
format="json",
)
upload = SimpleUploadedFile(
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
_build_fns_excel_bytes(),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with self.settings(
FNS_WATCH_DIRECTORY=str(base / "watch"),
FNS_PROCESSED_DIRECTORY=str(base / "processed"),
FNS_FAILED_DIRECTORY=str(base / "failed"),
):
fns_upload = self.client.post(
reverse("api_v1:fns:fns-upload"),
{"file": upload},
format="multipart",
)
for response in (
parsing_get,
parsing_patch,
logs_list,
logs_detail,
logs_export,
refresh_response,
fns_upload,
):
self.assertIn(
response.status_code,
{
status.HTTP_200_OK,
status.HTTP_202_ACCEPTED,
},
)
class RegistersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_superuser()
def test_registers_endpoints(self):
registry = RegisterFactory(name="Inventory Registry")
rows = [
{
"pn_name": "Inventory Org",
"mn_ogrn": "1027600980990",
"mn_inn": "7601000086",
"in_kpp": "760401001",
"mn_okpo": "07506197",
}
]
self.authenticate(self.admin)
upload = SimpleUploadedFile(
"inventory-registry.xlsx",
_build_register_excel_bytes(rows),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
upload_response = self.client.post(
reverse("api_v1:registers:register-upload"),
{
"registry": str(registry.id),
"actual_date": date(2026, 1, 1).isoformat(),
"file": upload,
},
format="multipart",
)
self.assertEqual(upload_response.status_code, status.HTTP_201_CREATED)
self.authenticate(self.user)
registries_list = self.client.get(reverse("api_v1:registers:registries-list"))
registries_detail = self.client.get(
reverse("api_v1:registers:registries-detail", args=[registry.id])
)
organizations_list = self.client.get(
reverse("api_v1:registers:organizations-list")
)
organization_id = _extract_results(organizations_list.data)[0]["id"]
organization_detail = self.client.get(
reverse("api_v1:registers:organizations-detail", args=[organization_id])
)
registry_organizations = self.client.get(
reverse(
"api_v1:registers:registry-organizations-list",
args=[registry.id],
)
)
for response in (
registries_list,
registries_detail,
organizations_list,
organization_detail,
registry_organizations,
):
self.assertEqual(response.status_code, status.HTTP_200_OK)
class ExchangeApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
@patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure")
@patch("apps.exchange.services.ExchangeConnectionService.prepare_target_structure")
@patch("apps.exchange.services.ExchangeConnectionService.test_connection")
@patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload")
@patch("apps.exchange.views.copy_parsers_data_async")
@patch("apps.exchange.services.ExchangeConnectionService.get_active_connection")
def test_exchange_endpoints(
self,
get_active_connection_mock,
copy_task_mock,
test_connection_payload_mock,
_test_connection_mock,
_prepare_mock,
_validate_mock,
):
self.authenticate(self.admin)
connections_url = reverse("api_v1:exchange:connections")
test_connection_url = reverse("api_v1:exchange:connections-test")
copy_url = reverse("api_v1:exchange:copy")
periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks")
connection_payload = {
"server": "127.0.0.1",
"port": 5432,
"username": "postgres",
"password": "secret",
"database_name": "target_db",
"schema_name": "public",
}
test_connection_payload_mock.return_value = {
"status": "success",
"message": "ok",
}
_test_connection_mock.return_value = "exchange_target_test"
list_connections = self.client.get(connections_url)
create_connection = self.client.post(
connections_url,
connection_payload,
format="json",
)
connection_test = self.client.post(
test_connection_url,
connection_payload,
format="json",
)
active_connection = ExchangeConnection.objects.get(
id=create_connection.data["id"]
)
get_active_connection_mock.return_value = active_connection
copy_task_mock.delay.return_value = SimpleNamespace(id="exchange-task-1")
copy_response = self.client.post(copy_url, {"mode": "all"}, format="json")
list_periodic = self.client.get(periodic_tasks_url)
create_periodic = self.client.post(
periodic_tasks_url,
{
"schedule_type": "interval",
"interval_every": 1,
"interval_period": "hours",
"notify_on_error": True,
},
format="json",
)
periodic_id = create_periodic.data["id"]
periodic_detail_url = reverse(
"api_v1:exchange:periodic-task-detail",
args=[periodic_id],
)
detail_periodic = self.client.get(periodic_detail_url)
patch_periodic = self.client.patch(
periodic_detail_url,
{
"schedule_type": "daily",
"crontab_hour": 2,
"crontab_minute": 30,
"notify_on_error": False,
},
format="json",
)
for response in (
list_connections,
create_connection,
connection_test,
copy_response,
list_periodic,
create_periodic,
detail_periodic,
patch_periodic,
):
self.assertIn(
response.status_code,
{
status.HTTP_200_OK,
status.HTTP_201_CREATED,
status.HTTP_202_ACCEPTED,
},
)
self.assertTrue(PeriodicTask.objects.filter(id=periodic_id).exists())
self.assertTrue(
ExchangeConnection.objects.filter(id=active_connection.id).exists()
)
class BackupsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
@patch("apps.backups.views.StateCorpExchangeService.build_package")
def test_backup_export_endpoint(self, build_package_mock):
self.authenticate(self.admin)
today = timezone.localdate()
build_package_mock.return_value = SimpleNamespace(
package_id="package-1",
archive_name="state_corp_exchange.zip",
bin_name="state_corp_exchange.bin",
archive_bytes=b"zip-bytes",
payload_counts={"organizations": 1},
produced_at="2026-05-12T10:00:00+00:00",
)
response = self.client.post(
reverse("api_v1:backups:export"),
{"actual_date": today.isoformat()},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.content, b"zip-bytes")
self.assertIn(
'attachment; filename="state_corp_exchange.zip"',
response["Content-Disposition"],
)
build_package_mock.assert_called_once_with(actual_date=today)