Files
mostovik-backend/tests/test_api_inventory_e2e.py
Aleksandr Meshchriakov b8a18d6da4
Some checks failed
CI/CD Pipeline / Quality Gate (push) Failing after 14s
CI/CD Pipeline / Build and Push Images (push) Has been skipped
CI/CD Pipeline / Deploy Dev in Dokploy (push) Has been skipped
CI/CD Pipeline / Internal Notify (push) Successful in 0s
feat: migrate parser data to source records
2026-05-19 20:21:31 +02:00

747 lines
26 KiB
Python

"""End-to-end smoke coverage for the full HTTP API inventory."""
from __future__ import annotations
import io
from datetime import date, timedelta
from pathlib import Path
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from unittest.mock import patch
from apps.core.models import BackgroundJob
from apps.exchange.models import ExchangeConnection
from apps.parsers.models import (
FinancialReport,
FinancialReportLine,
ParserLoadLog,
ProcurementRecord,
)
from apps.user.services import UserService
from django.core.files.uploadedfile import SimpleUploadedFile
from django.urls import reverse
from django.utils import timezone
from django_celery_beat.models import PeriodicTask
from openpyxl import Workbook
from rest_framework import status
from rest_framework.test import APITestCase
from tests.apps.parsers.factories import (
IndustrialCertificateRecordFactory,
IndustrialProductRecordFactory,
InspectionRecordFactory,
ManufacturerRecordFactory,
ParserLoadLogFactory,
)
from tests.apps.registers.factories import RegisterFactory
from tests.apps.user.factories import UserFactory
from tests.utils.fixtures import fake
def _digits(length: int) -> str:
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
def _build_fns_excel_bytes() -> bytes:
workbook = Workbook()
worksheet = workbook.active
year = fake.random_int(min=2020, max=2025)
worksheet.append(["Form", None, year, None])
worksheet.append([None, "Code", "Start", "End"])
worksheet.append(
[fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)]
)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _build_register_excel_bytes(rows: list[dict[str, str]]) -> bytes:
workbook = Workbook()
worksheet = workbook.active
worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "in_kpp", "mn_okpo"])
for row in rows:
worksheet.append(
[
row["pn_name"],
row["mn_ogrn"],
row["mn_inn"],
row["in_kpp"],
row["mn_okpo"],
]
)
buffer = io.BytesIO()
workbook.save(buffer)
workbook.close()
return buffer.getvalue()
def _extract_results(response_data):
if hasattr(response_data, "get"):
data = response_data.get("data")
if isinstance(data, list):
return data
results = response_data.get("results")
if results is not None:
return results
return response_data
def _create_procurement_record() -> ProcurementRecord:
return ProcurementRecord.objects.create(
load_batch=fake.random_int(min=1, max=1000),
purchase_number=_digits(19),
purchase_name=fake.sentence(nb_words=6),
customer_inn=_digits(10),
customer_kpp=_digits(9),
customer_ogrn=_digits(13),
customer_name=fake.company(),
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
status=fake.word(),
law_type="44-FZ",
href=fake.url(),
region_code=f"{fake.random_int(min=1, max=99):02d}",
)
class AuthenticatedApiMixin:
def authenticate(self, user):
tokens = UserService.get_tokens_for_user(user)
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
return tokens
class PublicApiInventoryE2ETest(APITestCase):
def test_health_and_swagger_endpoints(self):
swagger_response = self.client.get(reverse("schema-swagger-ui"))
health_response = self.client.get(reverse("core:health"))
live_response = self.client.get(reverse("core:liveness"))
ready_response = self.client.get(reverse("core:readiness"))
openapi_response = self.client.get(
reverse("schema-swagger-ui"),
{"format": "openapi"},
)
self.assertEqual(swagger_response.status_code, status.HTTP_200_OK)
self.assertEqual(openapi_response.status_code, status.HTTP_200_OK)
self.assertEqual(health_response.status_code, status.HTTP_200_OK)
self.assertEqual(live_response.status_code, status.HTTP_200_OK)
self.assertEqual(ready_response.status_code, status.HTTP_200_OK)
class UserApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
def test_auth_endpoints_and_disabled_self_service_endpoints(self):
initial_password = fake.password(length=16, special_chars=False)
disabled_registration_payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"password": initial_password,
"password_confirm": initial_password,
"phone": f"+7{fake.numerify('##########')}",
"first_name": "Ivan",
"middle_name": "Ivanovich",
"last_name": "Ivanov",
}
user = UserFactory.create_user(
username=fake.unique.user_name(),
password=initial_password,
)
register_response = self.client.post(
reverse("api_v1:user:register"),
disabled_registration_payload,
format="json",
)
self.assertEqual(
register_response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
login_response = self.client.post(
reverse("api_v1:user:login"),
{
"username": user.username,
"password": initial_password,
},
format="json",
)
self.assertEqual(login_response.status_code, status.HTTP_200_OK)
verify_response = self.client.post(
reverse("api_v1:user:token_verify"),
{"token": login_response.data["access"]},
format="json",
)
self.assertEqual(
verify_response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
refresh_response = self.client.post(
reverse("api_v1:user:token_refresh"),
{"refresh": login_response.data["refresh"]},
format="json",
)
self.assertEqual(refresh_response.status_code, status.HTTP_200_OK)
self.client.credentials(
HTTP_AUTHORIZATION=f"Bearer {refresh_response.data['access']}"
)
me_response = self.client.get(reverse("api_v1:user:current_user"))
self.assertEqual(me_response.status_code, status.HTTP_200_OK)
self.assertEqual(me_response.data["username"], user.username)
me_update_response = self.client.patch(
reverse("api_v1:user:user_update"),
{"phone": f"+7{fake.numerify('##########')}"},
format="json",
)
self.assertEqual(
me_update_response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
profile_response = self.client.get(reverse("api_v1:user:profile_detail"))
self.assertEqual(
profile_response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
profile_patch_response = self.client.patch(
reverse("api_v1:user:profile_detail"),
{
"first_name": "Petr",
"middle_name": "Petrovich",
"last_name": "Petrov",
},
format="json",
)
self.assertEqual(
profile_patch_response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
profile_full_response = self.client.get(reverse("api_v1:user:profile_full"))
self.assertEqual(
profile_full_response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
password_change_response = self.client.post(
reverse("api_v1:user:password_change"),
{
"old_password": initial_password,
"new_password": f"{initial_password}1",
"new_password_confirm": f"{initial_password}1",
},
format="json",
)
self.assertEqual(
password_change_response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
logout_response = self.client.post(
reverse("api_v1:user:logout"),
{},
format="json",
)
self.assertEqual(logout_response.status_code, status.HTTP_200_OK)
def test_admin_user_management_endpoints(self):
self.authenticate(self.admin)
list_response = self.client.get(reverse("api_v1:user:admin-users"))
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
self.assertEqual(
set(list_response.data.keys()),
{"count", "next", "previous", "results"},
)
self.assertTrue(list_response.data["results"])
self.assertIn("profile", list_response.data["results"][0])
create_payload = {
"email": fake.unique.email(),
"username": fake.unique.user_name(),
"phone": f"+7{fake.numerify('##########')}",
"password": "AdminManagedPass123",
"role": "user",
"first_name": "Alex",
"middle_name": "Alexeevich",
"last_name": "Alexeev",
}
create_response = self.client.post(
reverse("api_v1:user:admin-users"),
create_payload,
format="json",
)
self.assertEqual(create_response.status_code, status.HTTP_201_CREATED)
managed_user_id = create_response.data["id"]
detail_url = reverse("api_v1:user:admin-user-detail", args=[managed_user_id])
detail_response = self.client.get(detail_url)
self.assertEqual(detail_response.status_code, status.HTTP_200_OK)
patch_response = self.client.patch(
detail_url,
{
"role": "admin",
"first_name": "Sergey",
"middle_name": "Sergeevich",
"last_name": "Sergeev",
},
format="json",
)
self.assertEqual(patch_response.status_code, status.HTTP_200_OK)
self.assertEqual(patch_response.data["role"], "admin")
deactivate_response = self.client.post(
reverse("api_v1:user:admin-user-deactivate", args=[managed_user_id]),
{},
format="json",
)
self.assertEqual(deactivate_response.status_code, status.HTTP_200_OK)
activate_response = self.client.post(
reverse("api_v1:user:admin-user-activate", args=[managed_user_id]),
{},
format="json",
)
self.assertEqual(activate_response.status_code, status.HTTP_200_OK)
self.assertTrue(activate_response.data["is_active"])
class JobsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
def _create_job(self, *, task_id: str, status_value: str) -> BackgroundJob:
started_at = timezone.now()
completed_at = started_at + timedelta(seconds=3)
return BackgroundJob.objects.create(
task_id=task_id,
task_name="apps.parsers.tasks.fake_task",
status=status_value,
user_id=self.user.id,
started_at=started_at,
completed_at=completed_at,
progress=100,
)
def test_jobs_endpoints(self):
job = self._create_job(task_id="inventory-job-1", status_value="success")
self.authenticate(self.user)
list_response = self.client.get(reverse("api_v1:jobs:job-list"))
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
self.assertEqual(list_response.data["results"][0]["task_id"], job.task_id)
status_response = self.client.get(
reverse("api_v1:jobs:job-status", kwargs={"task_id": job.task_id})
)
self.assertEqual(status_response.status_code, status.HTTP_200_OK)
self.assertEqual(status_response.data["status"], "success")
stream_response = self.client.get(
reverse("api_v1:jobs:job-stream", kwargs={"task_id": job.task_id})
)
self.assertEqual(stream_response.status_code, status.HTTP_200_OK)
stream_chunks = b"".join(stream_response.streaming_content).decode("utf-8")
self.assertIn("event: completed", stream_chunks)
self.assertIn(job.task_id, stream_chunks)
class ParsersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_superuser()
def _create_financial_report(self) -> FinancialReport:
report = FinancialReport.objects.create(
external_id=_digits(5),
ogrn=_digits(13),
file_name=f"fin_{_digits(5)}_{_digits(13)}.xlsx",
file_hash=fake.sha256(raw_output=False),
load_batch=fake.random_int(min=1, max=1000),
status=FinancialReport.Status.SUCCESS,
source=FinancialReport.SourceType.API,
)
FinancialReportLine.objects.create(
report=report,
form_code="1",
line_code="1100",
line_name="Assets",
year=2025,
period_start=100,
period_end=200,
)
return report
def test_registry_read_endpoints_for_parsers_data(self):
certificate = IndustrialCertificateRecordFactory()
manufacturer = ManufacturerRecordFactory()
product = IndustrialProductRecordFactory()
inspection = InspectionRecordFactory()
procurement = _create_procurement_record()
report = self._create_financial_report()
self.authenticate(self.user)
cert_list = self.client.get(reverse("api_v1:minpromtorg:certificates-list"))
cert_detail = self.client.get(
reverse("api_v1:minpromtorg:certificates-detail", args=[certificate.id])
)
manufacturers_list = self.client.get(
reverse("api_v1:minpromtorg:manufacturers-list")
)
manufacturer_detail = self.client.get(
reverse("api_v1:minpromtorg:manufacturers-detail", args=[manufacturer.id])
)
products_list = self.client.get(
reverse("api_v1:minpromtorg:industrial-products-list")
)
product_detail = self.client.get(
reverse("api_v1:minpromtorg:industrial-products-detail", args=[product.id])
)
inspections_list = self.client.get(reverse("api_v1:proverki:inspections-list"))
inspection_detail = self.client.get(
reverse("api_v1:proverki:inspections-detail", args=[inspection.id])
)
procurements_list = self.client.get(reverse("api_v1:zakupki:procurements-list"))
procurement_detail = self.client.get(
reverse("api_v1:zakupki:procurements-detail", args=[procurement.id])
)
fns_reports_list = self.client.get(reverse("api_v1:fns:fns-reports-list"))
fns_report_detail = self.client.get(
reverse("api_v1:fns:fns-reports-detail", args=[report.id])
)
for response in (
cert_list,
cert_detail,
manufacturers_list,
manufacturer_detail,
products_list,
product_detail,
inspections_list,
inspection_detail,
procurements_list,
procurement_detail,
fns_reports_list,
fns_report_detail,
):
self.assertEqual(response.status_code, status.HTTP_200_OK)
def test_sources_parsing_and_system_endpoints(self):
shared_inn = _digits(10)
IndustrialCertificateRecordFactory(inn=shared_inn)
IndustrialProductRecordFactory(inn=shared_inn)
ManufacturerRecordFactory(load_batch=777, inn=shared_inn)
ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL,
status="success",
records_count=1,
)
ParserLoadLogFactory(
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
status="success",
records_count=1,
)
log = ParserLoadLogFactory(
source=ParserLoadLog.Source.MANUFACTURES,
batch_id=777,
status="success",
records_count=1,
)
self.authenticate(self.user)
sources_list = self.client.get(reverse("api_v1:sources:source-cards-list"))
sources_statuses = self.client.get(
reverse("api_v1:sources:source-cards-statuses")
)
source_detail = self.client.get(
reverse(
"api_v1:sources:source-cards-detail",
kwargs={"slug": "manufacturers-and-products"},
)
)
self.assertEqual(sources_list.status_code, status.HTTP_200_OK)
self.assertEqual(sources_statuses.status_code, status.HTTP_200_OK)
self.assertEqual(source_detail.status_code, status.HTTP_200_OK)
main_dashboard = self.client.get(reverse("api_v1:stat:main-dashboard"))
self.assertEqual(main_dashboard.status_code, status.HTTP_200_OK)
self.assertIn("source_cards", main_dashboard.data["data"])
self.assertIn("organization_stats", main_dashboard.data["data"])
self.authenticate(self.admin)
parsing_url = reverse("api_v1:parsing:parsing-settings")
parsing_get = self.client.get(parsing_url)
parsing_patch = self.client.patch(
parsing_url,
{"planned_inspections": "weekly"},
format="json",
)
logs_list = self.client.get(reverse("api_v1:system:parser-logs-list"))
logs_detail = self.client.get(
reverse("api_v1:system:parser-logs-detail", args=[log.id])
)
logs_export = self.client.get(reverse("api_v1:system:parser-logs-export"))
with TemporaryDirectory() as tmp_dir:
base = Path(tmp_dir)
refresh_response = self.client.post(
reverse(
"api_v1:sources:source-cards-refresh",
kwargs={"slug": "financial-indicators"},
),
{},
format="json",
)
upload = SimpleUploadedFile(
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
_build_fns_excel_bytes(),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
with self.settings(
FNS_WATCH_DIRECTORY=str(base / "watch"),
FNS_PROCESSED_DIRECTORY=str(base / "processed"),
FNS_FAILED_DIRECTORY=str(base / "failed"),
):
fns_upload = self.client.post(
reverse("api_v1:fns:fns-upload"),
{"file": upload},
format="multipart",
)
for response in (
parsing_get,
parsing_patch,
logs_list,
logs_detail,
logs_export,
refresh_response,
fns_upload,
):
self.assertIn(
response.status_code,
{
status.HTTP_200_OK,
status.HTTP_202_ACCEPTED,
},
)
class RegistersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.user = UserFactory.create_user()
self.admin = UserFactory.create_superuser()
def test_registers_endpoints(self):
registry = RegisterFactory(name="Inventory Registry")
rows = [
{
"pn_name": "Inventory Org",
"mn_ogrn": "1027600980990",
"mn_inn": "7601000086",
"in_kpp": "760401001",
"mn_okpo": "07506197",
}
]
self.authenticate(self.admin)
upload = SimpleUploadedFile(
"inventory-registry.xlsx",
_build_register_excel_bytes(rows),
content_type=(
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
),
)
upload_response = self.client.post(
reverse("api_v1:registers:register-upload"),
{
"registry": str(registry.id),
"actual_date": date(2026, 1, 1).isoformat(),
"file": upload,
},
format="multipart",
)
self.assertEqual(upload_response.status_code, status.HTTP_201_CREATED)
self.authenticate(self.user)
registries_list = self.client.get(reverse("api_v1:registers:registries-list"))
registries_detail = self.client.get(
reverse("api_v1:registers:registries-detail", args=[registry.id])
)
organizations_list = self.client.get(
reverse("api_v1:registers:organizations-list")
)
organization_id = _extract_results(organizations_list.data)[0]["id"]
organization_detail = self.client.get(
reverse("api_v1:registers:organizations-detail", args=[organization_id])
)
registry_organizations = self.client.get(
reverse(
"api_v1:registers:registry-organizations-list",
args=[registry.id],
)
)
for response in (
registries_list,
registries_detail,
organizations_list,
organization_detail,
registry_organizations,
):
self.assertEqual(response.status_code, status.HTTP_200_OK)
class ExchangeApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
@patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure")
@patch("apps.exchange.services.ExchangeConnectionService.prepare_target_structure")
@patch("apps.exchange.services.ExchangeConnectionService.test_connection")
@patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload")
def test_exchange_endpoints(
self,
test_connection_payload_mock,
_test_connection_mock,
_prepare_mock,
_validate_mock,
):
self.authenticate(self.admin)
connections_url = reverse("api_v1:exchange:connections")
test_connection_url = reverse("api_v1:exchange:connections-test")
copy_url = reverse("api_v1:exchange:copy")
tables_url = reverse("api_v1:exchange:tables")
periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks")
connection_payload = {
"server": "127.0.0.1",
"port": 5432,
"username": "postgres",
"password": "secret",
"database_name": "target_db",
"schema_name": "public",
}
test_connection_payload_mock.return_value = {
"status": "success",
"message": "ok",
}
_test_connection_mock.return_value = "exchange_target_test"
list_connections = self.client.get(connections_url)
create_connection = self.client.post(
connections_url,
connection_payload,
format="json",
)
connection_test = self.client.post(
test_connection_url,
connection_payload,
format="json",
)
active_connection = ExchangeConnection.objects.get(
id=create_connection.data["id"]
)
copy_response = self.client.post(copy_url, {"mode": "all"}, format="json")
tables_response = self.client.get(tables_url)
list_periodic = self.client.get(periodic_tasks_url)
create_periodic = self.client.post(
periodic_tasks_url,
{
"schedule_type": "interval",
"interval_every": 1,
"interval_period": "hours",
"notify_on_error": True,
},
format="json",
)
periodic_id = create_periodic.data["id"]
periodic_detail_url = reverse(
"api_v1:exchange:periodic-task-detail",
args=[periodic_id],
)
detail_periodic = self.client.get(periodic_detail_url)
patch_periodic = self.client.patch(
periodic_detail_url,
{
"schedule_type": "daily",
"crontab_hour": 2,
"crontab_minute": 30,
"notify_on_error": False,
},
format="json",
)
for response in (
list_connections,
create_connection,
connection_test,
list_periodic,
create_periodic,
patch_periodic,
):
self.assertIn(
response.status_code,
{
status.HTTP_200_OK,
status.HTTP_201_CREATED,
},
)
self.assertEqual(copy_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.assertEqual(tables_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED)
self.assertEqual(
detail_periodic.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED,
)
self.assertTrue(PeriodicTask.objects.filter(id=periodic_id).exists())
self.assertTrue(
ExchangeConnection.objects.filter(id=active_connection.id).exists()
)
class BackupsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
def setUp(self):
self.admin = UserFactory.create_superuser()
@patch("apps.backups.views.StateCorpExchangeService.build_package")
def test_backup_export_endpoint(self, build_package_mock):
self.authenticate(self.admin)
today = timezone.localdate()
build_package_mock.return_value = SimpleNamespace(
package_id="package-1",
archive_name="state_corp_exchange.zip",
bin_name="state_corp_exchange.bin",
archive_bytes=b"zip-bytes",
payload_counts={"organizations": 1},
produced_at="2026-05-12T10:00:00+00:00",
)
response = self.client.post(
reverse("api_v1:backups:export"),
{"actual_date": today.isoformat()},
format="json",
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(response.content, b"zip-bytes")
self.assertIn(
'attachment; filename="state_corp_exchange.zip"',
response["Content-Disposition"],
)
build_package_mock.assert_called_once_with(actual_date=today)