"""End-to-end smoke coverage for the full HTTP API inventory.""" from __future__ import annotations import io from datetime import date, timedelta from pathlib import Path from tempfile import TemporaryDirectory from types import SimpleNamespace from unittest.mock import patch from apps.core.models import BackgroundJob from apps.exchange.models import ExchangeConnection from apps.parsers.models import ( FinancialReport, FinancialReportLine, ParserLoadLog, ProcurementRecord, ) from apps.user.services import UserService from django.core.files.uploadedfile import SimpleUploadedFile from django.urls import reverse from django.utils import timezone from django_celery_beat.models import PeriodicTask from openpyxl import Workbook from rest_framework import status from rest_framework.test import APITestCase from tests.apps.parsers.factories import ( IndustrialCertificateRecordFactory, IndustrialProductRecordFactory, InspectionRecordFactory, ManufacturerRecordFactory, ParserLoadLogFactory, ) from tests.apps.registers.factories import RegisterFactory from tests.apps.user.factories import UserFactory from tests.utils.fixtures import fake def _digits(length: int) -> str: return "".join(str(fake.random_int(0, 9)) for _ in range(length)) def _build_fns_excel_bytes() -> bytes: workbook = Workbook() worksheet = workbook.active year = fake.random_int(min=2020, max=2025) worksheet.append(["Form", None, year, None]) worksheet.append([None, "Code", "Start", "End"]) worksheet.append( [fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)] ) buffer = io.BytesIO() workbook.save(buffer) workbook.close() return buffer.getvalue() def _build_register_excel_bytes(rows: list[dict[str, str]]) -> bytes: workbook = Workbook() worksheet = workbook.active worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "in_kpp", "mn_okpo"]) for row in rows: worksheet.append( [ row["pn_name"], row["mn_ogrn"], row["mn_inn"], row["in_kpp"], row["mn_okpo"], ] ) buffer = io.BytesIO() workbook.save(buffer) workbook.close() return buffer.getvalue() def _extract_results(response_data): if hasattr(response_data, "get"): data = response_data.get("data") if isinstance(data, list): return data results = response_data.get("results") if results is not None: return results return response_data def _create_procurement_record() -> ProcurementRecord: return ProcurementRecord.objects.create( load_batch=fake.random_int(min=1, max=1000), purchase_number=_digits(19), purchase_name=fake.sentence(nb_words=6), customer_inn=_digits(10), customer_kpp=_digits(9), customer_ogrn=_digits(13), customer_name=fake.company(), max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)), status=fake.word(), law_type="44-FZ", href=fake.url(), region_code=f"{fake.random_int(min=1, max=99):02d}", ) class AuthenticatedApiMixin: def authenticate(self, user): tokens = UserService.get_tokens_for_user(user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}") return tokens class PublicApiInventoryE2ETest(APITestCase): def test_health_and_swagger_endpoints(self): swagger_response = self.client.get(reverse("schema-swagger-ui")) health_response = self.client.get(reverse("core:health")) live_response = self.client.get(reverse("core:liveness")) ready_response = self.client.get(reverse("core:readiness")) openapi_response = self.client.get( reverse("schema-swagger-ui"), {"format": "openapi"}, ) self.assertEqual(swagger_response.status_code, status.HTTP_200_OK) self.assertEqual(openapi_response.status_code, status.HTTP_200_OK) self.assertEqual(health_response.status_code, status.HTTP_200_OK) self.assertEqual(live_response.status_code, status.HTTP_200_OK) self.assertEqual(ready_response.status_code, status.HTTP_200_OK) class UserApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.admin = UserFactory.create_superuser() def test_auth_endpoints_and_disabled_self_service_endpoints(self): initial_password = fake.password(length=16, special_chars=False) disabled_registration_payload = { "email": fake.unique.email(), "username": fake.unique.user_name(), "password": initial_password, "password_confirm": initial_password, "phone": f"+7{fake.numerify('##########')}", "first_name": "Ivan", "middle_name": "Ivanovich", "last_name": "Ivanov", } user = UserFactory.create_user( username=fake.unique.user_name(), password=initial_password, ) register_response = self.client.post( reverse("api_v1:user:register"), disabled_registration_payload, format="json", ) self.assertEqual( register_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) login_response = self.client.post( reverse("api_v1:user:login"), { "username": user.username, "password": initial_password, }, format="json", ) self.assertEqual(login_response.status_code, status.HTTP_200_OK) verify_response = self.client.post( reverse("api_v1:user:token_verify"), {"token": login_response.data["access"]}, format="json", ) self.assertEqual( verify_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) refresh_response = self.client.post( reverse("api_v1:user:token_refresh"), {"refresh": login_response.data["refresh"]}, format="json", ) self.assertEqual(refresh_response.status_code, status.HTTP_200_OK) self.client.credentials( HTTP_AUTHORIZATION=f"Bearer {refresh_response.data['access']}" ) me_response = self.client.get(reverse("api_v1:user:current_user")) self.assertEqual(me_response.status_code, status.HTTP_200_OK) self.assertEqual(me_response.data["username"], user.username) me_update_response = self.client.patch( reverse("api_v1:user:user_update"), {"phone": f"+7{fake.numerify('##########')}"}, format="json", ) self.assertEqual( me_update_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) profile_response = self.client.get(reverse("api_v1:user:profile_detail")) self.assertEqual( profile_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) profile_patch_response = self.client.patch( reverse("api_v1:user:profile_detail"), { "first_name": "Petr", "middle_name": "Petrovich", "last_name": "Petrov", }, format="json", ) self.assertEqual( profile_patch_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) profile_full_response = self.client.get(reverse("api_v1:user:profile_full")) self.assertEqual( profile_full_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) password_change_response = self.client.post( reverse("api_v1:user:password_change"), { "old_password": initial_password, "new_password": f"{initial_password}1", "new_password_confirm": f"{initial_password}1", }, format="json", ) self.assertEqual( password_change_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) logout_response = self.client.post( reverse("api_v1:user:logout"), {}, format="json", ) self.assertEqual(logout_response.status_code, status.HTTP_200_OK) def test_admin_user_management_endpoints(self): self.authenticate(self.admin) list_response = self.client.get(reverse("api_v1:user:admin-users")) self.assertEqual(list_response.status_code, status.HTTP_200_OK) self.assertEqual( set(list_response.data.keys()), {"count", "next", "previous", "results"}, ) self.assertTrue(list_response.data["results"]) self.assertIn("profile", list_response.data["results"][0]) create_payload = { "email": fake.unique.email(), "username": fake.unique.user_name(), "phone": f"+7{fake.numerify('##########')}", "password": "AdminManagedPass123", "role": "user", "first_name": "Alex", "middle_name": "Alexeevich", "last_name": "Alexeev", } create_response = self.client.post( reverse("api_v1:user:admin-users"), create_payload, format="json", ) self.assertEqual(create_response.status_code, status.HTTP_201_CREATED) managed_user_id = create_response.data["id"] detail_url = reverse("api_v1:user:admin-user-detail", args=[managed_user_id]) detail_response = self.client.get(detail_url) self.assertEqual(detail_response.status_code, status.HTTP_200_OK) patch_response = self.client.patch( detail_url, { "role": "admin", "first_name": "Sergey", "middle_name": "Sergeevich", "last_name": "Sergeev", }, format="json", ) self.assertEqual(patch_response.status_code, status.HTTP_200_OK) self.assertEqual(patch_response.data["role"], "admin") deactivate_response = self.client.post( reverse("api_v1:user:admin-user-deactivate", args=[managed_user_id]), {}, format="json", ) self.assertEqual(deactivate_response.status_code, status.HTTP_200_OK) activate_response = self.client.post( reverse("api_v1:user:admin-user-activate", args=[managed_user_id]), {}, format="json", ) self.assertEqual(activate_response.status_code, status.HTTP_200_OK) self.assertTrue(activate_response.data["is_active"]) class JobsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.user = UserFactory.create_user() def _create_job(self, *, task_id: str, status_value: str) -> BackgroundJob: started_at = timezone.now() completed_at = started_at + timedelta(seconds=3) return BackgroundJob.objects.create( task_id=task_id, task_name="apps.parsers.tasks.fake_task", status=status_value, user_id=self.user.id, started_at=started_at, completed_at=completed_at, progress=100, ) def test_jobs_endpoints(self): job = self._create_job(task_id="inventory-job-1", status_value="success") self.authenticate(self.user) list_response = self.client.get(reverse("api_v1:jobs:job-list")) self.assertEqual(list_response.status_code, status.HTTP_200_OK) self.assertEqual(list_response.data["results"][0]["task_id"], job.task_id) status_response = self.client.get( reverse("api_v1:jobs:job-status", kwargs={"task_id": job.task_id}) ) self.assertEqual(status_response.status_code, status.HTTP_200_OK) self.assertEqual(status_response.data["status"], "success") stream_response = self.client.get( reverse("api_v1:jobs:job-stream", kwargs={"task_id": job.task_id}) ) self.assertEqual(stream_response.status_code, status.HTTP_200_OK) stream_chunks = b"".join(stream_response.streaming_content).decode("utf-8") self.assertIn("event: completed", stream_chunks) self.assertIn(job.task_id, stream_chunks) class ParsersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.user = UserFactory.create_user() self.admin = UserFactory.create_superuser() def _create_financial_report(self) -> FinancialReport: report = FinancialReport.objects.create( external_id=_digits(5), ogrn=_digits(13), file_name=f"fin_{_digits(5)}_{_digits(13)}.xlsx", file_hash=fake.sha256(raw_output=False), load_batch=fake.random_int(min=1, max=1000), status=FinancialReport.Status.SUCCESS, source=FinancialReport.SourceType.API, ) FinancialReportLine.objects.create( report=report, form_code="1", line_code="1100", line_name="Assets", year=2025, period_start=100, period_end=200, ) return report def test_registry_read_endpoints_for_parsers_data(self): certificate = IndustrialCertificateRecordFactory() manufacturer = ManufacturerRecordFactory() product = IndustrialProductRecordFactory() inspection = InspectionRecordFactory() procurement = _create_procurement_record() report = self._create_financial_report() self.authenticate(self.user) cert_list = self.client.get(reverse("api_v1:minpromtorg:certificates-list")) cert_detail = self.client.get( reverse("api_v1:minpromtorg:certificates-detail", args=[certificate.id]) ) manufacturers_list = self.client.get( reverse("api_v1:minpromtorg:manufacturers-list") ) manufacturer_detail = self.client.get( reverse("api_v1:minpromtorg:manufacturers-detail", args=[manufacturer.id]) ) products_list = self.client.get( reverse("api_v1:minpromtorg:industrial-products-list") ) product_detail = self.client.get( reverse("api_v1:minpromtorg:industrial-products-detail", args=[product.id]) ) inspections_list = self.client.get(reverse("api_v1:proverki:inspections-list")) inspection_detail = self.client.get( reverse("api_v1:proverki:inspections-detail", args=[inspection.id]) ) procurements_list = self.client.get(reverse("api_v1:zakupki:procurements-list")) procurement_detail = self.client.get( reverse("api_v1:zakupki:procurements-detail", args=[procurement.id]) ) fns_reports_list = self.client.get(reverse("api_v1:fns:fns-reports-list")) fns_report_detail = self.client.get( reverse("api_v1:fns:fns-reports-detail", args=[report.id]) ) for response in ( cert_list, cert_detail, manufacturers_list, manufacturer_detail, products_list, product_detail, inspections_list, inspection_detail, procurements_list, procurement_detail, fns_reports_list, fns_report_detail, ): self.assertEqual(response.status_code, status.HTTP_200_OK) def test_sources_parsing_and_system_endpoints(self): shared_inn = _digits(10) IndustrialCertificateRecordFactory(inn=shared_inn) IndustrialProductRecordFactory(inn=shared_inn) ManufacturerRecordFactory(load_batch=777, inn=shared_inn) ParserLoadLogFactory( source=ParserLoadLog.Source.INDUSTRIAL, status="success", records_count=1, ) ParserLoadLogFactory( source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS, status="success", records_count=1, ) log = ParserLoadLogFactory( source=ParserLoadLog.Source.MANUFACTURES, batch_id=777, status="success", records_count=1, ) self.authenticate(self.user) sources_list = self.client.get(reverse("api_v1:sources:source-cards-list")) sources_statuses = self.client.get( reverse("api_v1:sources:source-cards-statuses") ) source_detail = self.client.get( reverse( "api_v1:sources:source-cards-detail", kwargs={"slug": "manufacturers-and-products"}, ) ) self.assertEqual(sources_list.status_code, status.HTTP_200_OK) self.assertEqual(sources_statuses.status_code, status.HTTP_200_OK) self.assertEqual(source_detail.status_code, status.HTTP_200_OK) main_dashboard = self.client.get(reverse("api_v1:stat:main-dashboard")) self.assertEqual(main_dashboard.status_code, status.HTTP_200_OK) self.assertIn("source_cards", main_dashboard.data["data"]) self.assertIn("organization_stats", main_dashboard.data["data"]) self.authenticate(self.admin) parsing_url = reverse("api_v1:parsing:parsing-settings") parsing_get = self.client.get(parsing_url) parsing_patch = self.client.patch( parsing_url, {"planned_inspections": "weekly"}, format="json", ) logs_list = self.client.get(reverse("api_v1:system:parser-logs-list")) logs_detail = self.client.get( reverse("api_v1:system:parser-logs-detail", args=[log.id]) ) logs_export = self.client.get(reverse("api_v1:system:parser-logs-export")) with TemporaryDirectory() as tmp_dir: base = Path(tmp_dir) refresh_response = self.client.post( reverse( "api_v1:sources:source-cards-refresh", kwargs={"slug": "financial-indicators"}, ), {}, format="json", ) upload = SimpleUploadedFile( f"fin_{_digits(5)}_{_digits(13)}.xlsx", _build_fns_excel_bytes(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with self.settings( FNS_WATCH_DIRECTORY=str(base / "watch"), FNS_PROCESSED_DIRECTORY=str(base / "processed"), FNS_FAILED_DIRECTORY=str(base / "failed"), ): fns_upload = self.client.post( reverse("api_v1:fns:fns-upload"), {"file": upload}, format="multipart", ) for response in ( parsing_get, parsing_patch, logs_list, logs_detail, logs_export, refresh_response, fns_upload, ): self.assertIn( response.status_code, { status.HTTP_200_OK, status.HTTP_202_ACCEPTED, }, ) class RegistersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.user = UserFactory.create_user() self.admin = UserFactory.create_superuser() def test_registers_endpoints(self): registry = RegisterFactory(name="Inventory Registry") rows = [ { "pn_name": "Inventory Org", "mn_ogrn": "1027600980990", "mn_inn": "7601000086", "in_kpp": "760401001", "mn_okpo": "07506197", } ] self.authenticate(self.admin) upload = SimpleUploadedFile( "inventory-registry.xlsx", _build_register_excel_bytes(rows), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) upload_response = self.client.post( reverse("api_v1:registers:register-upload"), { "registry": str(registry.id), "actual_date": date(2026, 1, 1).isoformat(), "file": upload, }, format="multipart", ) self.assertEqual(upload_response.status_code, status.HTTP_201_CREATED) self.authenticate(self.user) registries_list = self.client.get(reverse("api_v1:registers:registries-list")) registries_detail = self.client.get( reverse("api_v1:registers:registries-detail", args=[registry.id]) ) organizations_list = self.client.get( reverse("api_v1:registers:organizations-list") ) organization_id = _extract_results(organizations_list.data)[0]["id"] organization_detail = self.client.get( reverse("api_v1:registers:organizations-detail", args=[organization_id]) ) registry_organizations = self.client.get( reverse( "api_v1:registers:registry-organizations-list", args=[registry.id], ) ) for response in ( registries_list, registries_detail, organizations_list, organization_detail, registry_organizations, ): self.assertEqual(response.status_code, status.HTTP_200_OK) class ExchangeApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.admin = UserFactory.create_superuser() @patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure") @patch("apps.exchange.services.ExchangeConnectionService.prepare_target_structure") @patch("apps.exchange.services.ExchangeConnectionService.test_connection") @patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload") def test_exchange_endpoints( self, test_connection_payload_mock, _test_connection_mock, _prepare_mock, _validate_mock, ): self.authenticate(self.admin) connections_url = reverse("api_v1:exchange:connections") test_connection_url = reverse("api_v1:exchange:connections-test") copy_url = reverse("api_v1:exchange:copy") tables_url = reverse("api_v1:exchange:tables") periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks") connection_payload = { "server": "127.0.0.1", "port": 5432, "username": "postgres", "password": "secret", "database_name": "target_db", "schema_name": "public", } test_connection_payload_mock.return_value = { "status": "success", "message": "ok", } _test_connection_mock.return_value = "exchange_target_test" list_connections = self.client.get(connections_url) create_connection = self.client.post( connections_url, connection_payload, format="json", ) connection_test = self.client.post( test_connection_url, connection_payload, format="json", ) active_connection = ExchangeConnection.objects.get( id=create_connection.data["id"] ) copy_response = self.client.post(copy_url, {"mode": "all"}, format="json") tables_response = self.client.get(tables_url) list_periodic = self.client.get(periodic_tasks_url) create_periodic = self.client.post( periodic_tasks_url, { "schedule_type": "interval", "interval_every": 1, "interval_period": "hours", "notify_on_error": True, }, format="json", ) periodic_id = create_periodic.data["id"] periodic_detail_url = reverse( "api_v1:exchange:periodic-task-detail", args=[periodic_id], ) detail_periodic = self.client.get(periodic_detail_url) patch_periodic = self.client.patch( periodic_detail_url, { "schedule_type": "daily", "crontab_hour": 2, "crontab_minute": 30, "notify_on_error": False, }, format="json", ) for response in ( list_connections, create_connection, connection_test, list_periodic, create_periodic, patch_periodic, ): self.assertIn( response.status_code, { status.HTTP_200_OK, status.HTTP_201_CREATED, }, ) self.assertEqual(copy_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED) self.assertEqual(tables_response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED) self.assertEqual( detail_periodic.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, ) self.assertTrue(PeriodicTask.objects.filter(id=periodic_id).exists()) self.assertTrue( ExchangeConnection.objects.filter(id=active_connection.id).exists() ) class BackupsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.admin = UserFactory.create_superuser() @patch("apps.backups.views.StateCorpExchangeService.build_package") def test_backup_export_endpoint(self, build_package_mock): self.authenticate(self.admin) today = timezone.localdate() build_package_mock.return_value = SimpleNamespace( package_id="package-1", archive_name="state_corp_exchange.zip", bin_name="state_corp_exchange.bin", archive_bytes=b"zip-bytes", payload_counts={"organizations": 1}, produced_at="2026-05-12T10:00:00+00:00", ) response = self.client.post( reverse("api_v1:backups:export"), {"actual_date": today.isoformat()}, format="json", ) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEqual(response.content, b"zip-bytes") self.assertIn( 'attachment; filename="state_corp_exchange.zip"', response["Content-Disposition"], ) build_package_mock.assert_called_once_with(actual_date=today)