"""End-to-end smoke coverage for the full HTTP API inventory.""" from __future__ import annotations import io from datetime import date, timedelta from pathlib import Path from tempfile import TemporaryDirectory from types import SimpleNamespace from unittest.mock import patch from apps.backups.models import BackupExportJob from apps.core.models import BackgroundJob from apps.exchange.models import ExchangeConnection from apps.parsers.models import ( FinancialReport, FinancialReportLine, ParserLoadLog, ProcurementRecord, ) from apps.user.services import UserService from django.core.files.uploadedfile import SimpleUploadedFile from django.urls import reverse from django.utils import timezone from django_celery_beat.models import PeriodicTask from openpyxl import Workbook from rest_framework import status from rest_framework.test import APITestCase from tests.apps.parsers.factories import ( IndustrialCertificateRecordFactory, IndustrialProductRecordFactory, InspectionRecordFactory, ManufacturerRecordFactory, ParserLoadLogFactory, ) from tests.apps.registers.factories import RegisterFactory from tests.apps.user.factories import UserFactory from tests.utils.fixtures import fake def _digits(length: int) -> str: return "".join(str(fake.random_int(0, 9)) for _ in range(length)) def _build_fns_excel_bytes() -> bytes: workbook = Workbook() worksheet = workbook.active year = fake.random_int(min=2020, max=2025) worksheet.append(["Form", None, year, None]) worksheet.append([None, "Code", "Start", "End"]) worksheet.append( [fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)] ) buffer = io.BytesIO() workbook.save(buffer) workbook.close() return buffer.getvalue() def _build_register_excel_bytes(rows: list[dict[str, str]]) -> bytes: workbook = Workbook() worksheet = workbook.active worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "in_kpp", "mn_okpo"]) for row in rows: worksheet.append( [ row["pn_name"], row["mn_ogrn"], row["mn_inn"], row["in_kpp"], row["mn_okpo"], ] ) buffer = io.BytesIO() workbook.save(buffer) workbook.close() return buffer.getvalue() def _extract_results(response_data): if hasattr(response_data, "get"): data = response_data.get("data") if isinstance(data, list): return data results = response_data.get("results") if results is not None: return results return response_data def _create_procurement_record() -> ProcurementRecord: return ProcurementRecord.objects.create( load_batch=fake.random_int(min=1, max=1000), purchase_number=_digits(19), purchase_name=fake.sentence(nb_words=6), customer_inn=_digits(10), customer_kpp=_digits(9), customer_ogrn=_digits(13), customer_name=fake.company(), max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)), status=fake.word(), law_type="44-FZ", href=fake.url(), region_code=f"{fake.random_int(min=1, max=99):02d}", ) class AuthenticatedApiMixin: def authenticate(self, user): tokens = UserService.get_tokens_for_user(user) self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}") return tokens class PublicApiInventoryE2ETest(APITestCase): def test_health_and_swagger_endpoints(self): swagger_response = self.client.get(reverse("schema-swagger-ui")) health_response = self.client.get(reverse("core:health")) live_response = self.client.get(reverse("core:liveness")) ready_response = self.client.get(reverse("core:readiness")) self.assertEqual(swagger_response.status_code, status.HTTP_200_OK) self.assertEqual(health_response.status_code, status.HTTP_200_OK) self.assertEqual(live_response.status_code, status.HTTP_200_OK) self.assertEqual(ready_response.status_code, status.HTTP_200_OK) class UserApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.admin = UserFactory.create_superuser() def test_auth_and_profile_endpoints(self): initial_password = fake.password(length=16, special_chars=False) new_password = fake.password(length=18, special_chars=False) register_payload = { "email": fake.unique.email(), "username": fake.unique.user_name(), "password": initial_password, "password_confirm": initial_password, "phone": f"+7{fake.numerify('##########')}", "first_name": "Ivan", "middle_name": "Ivanovich", "last_name": "Ivanov", } register_response = self.client.post( reverse("api_v1:user:register"), register_payload, format="json", ) self.assertEqual(register_response.status_code, status.HTTP_201_CREATED) self.assertIn("tokens", register_response.data) login_response = self.client.post( reverse("api_v1:user:login"), { "username": register_payload["username"], "password": initial_password, }, format="json", ) self.assertEqual(login_response.status_code, status.HTTP_200_OK) verify_response = self.client.post( reverse("api_v1:user:token_verify"), {"token": login_response.data["access"]}, format="json", ) self.assertEqual(verify_response.status_code, status.HTTP_200_OK) refresh_response = self.client.post( reverse("api_v1:user:token_refresh"), {"refresh": login_response.data["refresh"]}, format="json", ) self.assertEqual(refresh_response.status_code, status.HTTP_200_OK) self.client.credentials( HTTP_AUTHORIZATION=f"Bearer {refresh_response.data['access']}" ) me_response = self.client.get(reverse("api_v1:user:current_user")) self.assertEqual(me_response.status_code, status.HTTP_200_OK) self.assertEqual(me_response.data["username"], register_payload["username"]) me_update_response = self.client.patch( reverse("api_v1:user:user_update"), {"phone": f"+7{fake.numerify('##########')}"}, format="json", ) self.assertEqual(me_update_response.status_code, status.HTTP_200_OK) profile_response = self.client.get(reverse("api_v1:user:profile_detail")) self.assertEqual(profile_response.status_code, status.HTTP_200_OK) profile_patch_response = self.client.patch( reverse("api_v1:user:profile_detail"), { "first_name": "Petr", "middle_name": "Petrovich", "last_name": "Petrov", }, format="json", ) self.assertEqual(profile_patch_response.status_code, status.HTTP_200_OK) profile_full_response = self.client.get(reverse("api_v1:user:profile_full")) self.assertEqual(profile_full_response.status_code, status.HTTP_200_OK) self.assertEqual( profile_full_response.data["username"], register_payload["username"] ) password_change_response = self.client.post( reverse("api_v1:user:password_change"), { "old_password": initial_password, "new_password": new_password, "new_password_confirm": new_password, }, format="json", ) self.assertEqual(password_change_response.status_code, status.HTTP_200_OK) relogin_response = self.client.post( reverse("api_v1:user:login"), { "username": register_payload["username"], "password": new_password, }, format="json", ) self.assertEqual(relogin_response.status_code, status.HTTP_200_OK) self.client.credentials( HTTP_AUTHORIZATION=f"Bearer {relogin_response.data['access']}" ) logout_response = self.client.post( reverse("api_v1:user:logout"), {}, format="json", ) self.assertEqual(logout_response.status_code, status.HTTP_200_OK) def test_admin_user_management_endpoints(self): self.authenticate(self.admin) list_response = self.client.get(reverse("api_v1:user:admin-users")) self.assertEqual(list_response.status_code, status.HTTP_200_OK) self.assertEqual( set(list_response.data.keys()), {"count", "next", "previous", "results"}, ) self.assertTrue(list_response.data["results"]) self.assertIn("profile", list_response.data["results"][0]) create_payload = { "email": fake.unique.email(), "username": fake.unique.user_name(), "phone": f"+7{fake.numerify('##########')}", "password": "AdminManagedPass123", "role": "user", "first_name": "Alex", "middle_name": "Alexeevich", "last_name": "Alexeev", } create_response = self.client.post( reverse("api_v1:user:admin-users"), create_payload, format="json", ) self.assertEqual(create_response.status_code, status.HTTP_201_CREATED) managed_user_id = create_response.data["id"] detail_url = reverse("api_v1:user:admin-user-detail", args=[managed_user_id]) detail_response = self.client.get(detail_url) self.assertEqual(detail_response.status_code, status.HTTP_200_OK) patch_response = self.client.patch( detail_url, { "role": "admin", "first_name": "Sergey", "middle_name": "Sergeevich", "last_name": "Sergeev", }, format="json", ) self.assertEqual(patch_response.status_code, status.HTTP_200_OK) self.assertEqual(patch_response.data["role"], "admin") deactivate_response = self.client.post( reverse("api_v1:user:admin-user-deactivate", args=[managed_user_id]), {}, format="json", ) self.assertEqual(deactivate_response.status_code, status.HTTP_200_OK) activate_response = self.client.post( reverse("api_v1:user:admin-user-activate", args=[managed_user_id]), {}, format="json", ) self.assertEqual(activate_response.status_code, status.HTTP_200_OK) self.assertTrue(activate_response.data["is_active"]) class JobsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.user = UserFactory.create_user() def _create_job(self, *, task_id: str, status_value: str) -> BackgroundJob: started_at = timezone.now() completed_at = started_at + timedelta(seconds=3) return BackgroundJob.objects.create( task_id=task_id, task_name="apps.parsers.tasks.fake_task", status=status_value, user_id=self.user.id, started_at=started_at, completed_at=completed_at, progress=100, ) def test_jobs_endpoints(self): job = self._create_job(task_id="inventory-job-1", status_value="success") self.authenticate(self.user) list_response = self.client.get(reverse("api_v1:jobs:job-list")) self.assertEqual(list_response.status_code, status.HTTP_200_OK) self.assertEqual(list_response.data["results"][0]["task_id"], job.task_id) status_response = self.client.get( reverse("api_v1:jobs:job-status", kwargs={"task_id": job.task_id}) ) self.assertEqual(status_response.status_code, status.HTTP_200_OK) self.assertEqual(status_response.data["status"], "success") stream_response = self.client.get( reverse("api_v1:jobs:job-stream", kwargs={"task_id": job.task_id}) ) self.assertEqual(stream_response.status_code, status.HTTP_200_OK) stream_chunks = b"".join(stream_response.streaming_content).decode("utf-8") self.assertIn("event: completed", stream_chunks) self.assertIn(job.task_id, stream_chunks) class ParsersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.user = UserFactory.create_user() self.admin = UserFactory.create_superuser() def _create_financial_report(self) -> FinancialReport: report = FinancialReport.objects.create( external_id=_digits(5), ogrn=_digits(13), file_name=f"fin_{_digits(5)}_{_digits(13)}.xlsx", file_hash=fake.sha256(raw_output=False), load_batch=fake.random_int(min=1, max=1000), status=FinancialReport.Status.SUCCESS, source=FinancialReport.SourceType.API, ) FinancialReportLine.objects.create( report=report, form_code="1", line_code="1100", line_name="Assets", year=2025, period_start=100, period_end=200, ) return report def test_registry_read_endpoints_for_parsers_data(self): certificate = IndustrialCertificateRecordFactory() manufacturer = ManufacturerRecordFactory() product = IndustrialProductRecordFactory() inspection = InspectionRecordFactory() procurement = _create_procurement_record() report = self._create_financial_report() self.authenticate(self.user) cert_list = self.client.get(reverse("api_v1:minpromtorg:certificates-list")) cert_detail = self.client.get( reverse("api_v1:minpromtorg:certificates-detail", args=[certificate.id]) ) manufacturers_list = self.client.get( reverse("api_v1:minpromtorg:manufacturers-list") ) manufacturer_detail = self.client.get( reverse("api_v1:minpromtorg:manufacturers-detail", args=[manufacturer.id]) ) products_list = self.client.get( reverse("api_v1:minpromtorg:industrial-products-list") ) product_detail = self.client.get( reverse("api_v1:minpromtorg:industrial-products-detail", args=[product.id]) ) inspections_list = self.client.get(reverse("api_v1:proverki:inspections-list")) inspection_detail = self.client.get( reverse("api_v1:proverki:inspections-detail", args=[inspection.id]) ) procurements_list = self.client.get(reverse("api_v1:zakupki:procurements-list")) procurement_detail = self.client.get( reverse("api_v1:zakupki:procurements-detail", args=[procurement.id]) ) fns_reports_list = self.client.get(reverse("api_v1:fns:fns-reports-list")) fns_report_detail = self.client.get( reverse("api_v1:fns:fns-reports-detail", args=[report.id]) ) for response in ( cert_list, cert_detail, manufacturers_list, manufacturer_detail, products_list, product_detail, inspections_list, inspection_detail, procurements_list, procurement_detail, fns_reports_list, fns_report_detail, ): self.assertEqual(response.status_code, status.HTTP_200_OK) def test_sources_parsing_and_system_endpoints(self): shared_inn = _digits(10) IndustrialCertificateRecordFactory(inn=shared_inn) IndustrialProductRecordFactory(inn=shared_inn) ManufacturerRecordFactory(load_batch=777, inn=shared_inn) ParserLoadLogFactory( source=ParserLoadLog.Source.INDUSTRIAL, status="success", records_count=1, ) ParserLoadLogFactory( source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS, status="success", records_count=1, ) log = ParserLoadLogFactory( source=ParserLoadLog.Source.MANUFACTURES, batch_id=777, status="success", records_count=1, ) self.authenticate(self.user) sources_list = self.client.get(reverse("api_v1:sources:source-cards-list")) sources_statuses = self.client.get( reverse("api_v1:sources:source-cards-statuses") ) source_detail = self.client.get( reverse( "api_v1:sources:source-cards-detail", kwargs={"slug": "manufacturers-and-products"}, ) ) self.assertEqual(sources_list.status_code, status.HTTP_200_OK) self.assertEqual(sources_statuses.status_code, status.HTTP_200_OK) self.assertEqual(source_detail.status_code, status.HTTP_200_OK) self.authenticate(self.admin) parsing_url = reverse("api_v1:parsing:parsing-settings") parsing_get = self.client.get(parsing_url) parsing_patch = self.client.patch( parsing_url, {"planned_inspections": "weekly"}, format="json", ) logs_list = self.client.get(reverse("api_v1:system:parser-logs-list")) logs_detail = self.client.get( reverse("api_v1:system:parser-logs-detail", args=[log.id]) ) logs_export = self.client.get(reverse("api_v1:system:parser-logs-export")) with TemporaryDirectory() as tmp_dir: base = Path(tmp_dir) refresh_response = self.client.post( reverse( "api_v1:sources:source-cards-refresh", kwargs={"slug": "financial-indicators"}, ), {}, format="json", ) upload = SimpleUploadedFile( f"fin_{_digits(5)}_{_digits(13)}.xlsx", _build_fns_excel_bytes(), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) with self.settings( FNS_WATCH_DIRECTORY=str(base / "watch"), FNS_PROCESSED_DIRECTORY=str(base / "processed"), FNS_FAILED_DIRECTORY=str(base / "failed"), ): fns_upload = self.client.post( reverse("api_v1:fns:fns-upload"), {"file": upload}, format="multipart", ) for response in ( parsing_get, parsing_patch, logs_list, logs_detail, logs_export, refresh_response, fns_upload, ): self.assertIn( response.status_code, { status.HTTP_200_OK, status.HTTP_202_ACCEPTED, }, ) class RegistersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.user = UserFactory.create_user() self.admin = UserFactory.create_superuser() def test_registers_endpoints(self): registry = RegisterFactory(name="Inventory Registry") rows = [ { "pn_name": "Inventory Org", "mn_ogrn": "1027600980990", "mn_inn": "7601000086", "in_kpp": "760401001", "mn_okpo": "07506197", } ] self.authenticate(self.admin) upload = SimpleUploadedFile( "inventory-registry.xlsx", _build_register_excel_bytes(rows), content_type=( "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" ), ) upload_response = self.client.post( reverse("api_v1:registers:register-upload"), { "registry": str(registry.id), "actual_date": date(2026, 1, 1).isoformat(), "file": upload, }, format="multipart", ) self.assertEqual(upload_response.status_code, status.HTTP_201_CREATED) self.authenticate(self.user) registries_list = self.client.get(reverse("api_v1:registers:registries-list")) registries_detail = self.client.get( reverse("api_v1:registers:registries-detail", args=[registry.id]) ) organizations_list = self.client.get( reverse("api_v1:registers:organizations-list") ) organization_id = _extract_results(organizations_list.data)[0]["id"] organization_detail = self.client.get( reverse("api_v1:registers:organizations-detail", args=[organization_id]) ) registry_organizations = self.client.get( reverse( "api_v1:registers:registry-organizations-list", args=[registry.id], ) ) for response in ( registries_list, registries_detail, organizations_list, organization_detail, registry_organizations, ): self.assertEqual(response.status_code, status.HTTP_200_OK) class ExchangeApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.admin = UserFactory.create_superuser() @patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure") @patch("apps.exchange.services.ExchangeConnectionService.test_connection") @patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload") @patch("apps.exchange.views.copy_parsers_data_async.delay") @patch("apps.exchange.services.ExchangeConnectionService.get_active_connection") def test_exchange_endpoints( self, get_active_connection_mock, delay_mock, test_connection_payload_mock, _test_connection_mock, _validate_mock, ): self.authenticate(self.admin) connections_url = reverse("api_v1:exchange:connections") test_connection_url = reverse("api_v1:exchange:connections-test") copy_url = reverse("api_v1:exchange:copy") periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks") connection_payload = { "server": "127.0.0.1", "port": 5432, "username": "postgres", "password": "secret", "database_name": "target_db", "schema_name": "public", } test_connection_payload_mock.return_value = { "status": "success", "message": "ok", } list_connections = self.client.get(connections_url) create_connection = self.client.post( connections_url, connection_payload, format="json", ) connection_test = self.client.post( test_connection_url, connection_payload, format="json", ) active_connection = ExchangeConnection.objects.get( id=create_connection.data["id"] ) get_active_connection_mock.return_value = active_connection delay_mock.return_value = SimpleNamespace(id="exchange-task-1") copy_response = self.client.post(copy_url, {"mode": "all"}, format="json") list_periodic = self.client.get(periodic_tasks_url) create_periodic = self.client.post( periodic_tasks_url, { "schedule_type": "interval", "interval_every": 1, "interval_period": "hours", "notify_on_error": True, }, format="json", ) periodic_id = create_periodic.data["id"] periodic_detail_url = reverse( "api_v1:exchange:periodic-task-detail", args=[periodic_id], ) detail_periodic = self.client.get(periodic_detail_url) patch_periodic = self.client.patch( periodic_detail_url, { "schedule_type": "daily", "crontab_hour": 2, "crontab_minute": 30, "notify_on_error": False, }, format="json", ) for response in ( list_connections, create_connection, connection_test, copy_response, list_periodic, create_periodic, detail_periodic, patch_periodic, ): self.assertIn( response.status_code, { status.HTTP_200_OK, status.HTTP_201_CREATED, status.HTTP_202_ACCEPTED, }, ) self.assertTrue(PeriodicTask.objects.filter(id=periodic_id).exists()) self.assertTrue( ExchangeConnection.objects.filter(id=active_connection.id).exists() ) class BackupsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase): def setUp(self): self.admin = UserFactory.create_superuser() @patch("apps.backups.services.BackupExportJobService._enqueue_backup_task") def test_backup_export_endpoint(self, enqueue_mock): self.authenticate(self.admin) today = timezone.localdate() with self.captureOnCommitCallbacks(execute=True): response = self.client.post( reverse("api_v1:backups:export"), {"actual_date": today.isoformat()}, format="json", ) self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) self.assertEqual(response.data["status"], "started") self.assertTrue(BackupExportJob.objects.filter(actual_date=today).exists()) enqueue_mock.assert_called_once()