test(api): add full inventory e2e smoke coverage
This commit is contained in:
@@ -44,7 +44,15 @@ class ProxyAdmin(admin.ModelAdmin):
|
||||
fieldsets = (
|
||||
(
|
||||
"Основное",
|
||||
{"fields": ("address", "country_code", "source", "description", "is_active")},
|
||||
{
|
||||
"fields": (
|
||||
"address",
|
||||
"country_code",
|
||||
"source",
|
||||
"description",
|
||||
"is_active",
|
||||
)
|
||||
},
|
||||
),
|
||||
("Статистика", {"fields": ("fail_count", "last_used_at")}),
|
||||
("Даты", {"fields": ("created_at", "updated_at"), "classes": ("collapse",)}),
|
||||
|
||||
@@ -428,6 +428,7 @@ class ParserLoadLogListSerializer(serializers.Serializer):
|
||||
created_at = serializers.DateTimeField(read_only=True)
|
||||
updated_at = serializers.DateTimeField(read_only=True)
|
||||
|
||||
|
||||
class SourceCardRefreshParamSerializer(serializers.Serializer):
|
||||
"""Описание параметра ручного обновления карточки источника."""
|
||||
|
||||
|
||||
@@ -841,7 +841,9 @@ class ProxyToolsSyncService:
|
||||
max_pages = max(int(getattr(settings, "PROXY_TOOLS_MAX_PAGES", 3)), 1)
|
||||
|
||||
try:
|
||||
items = cls._fetch_all_pages(client=client, limit=limit, max_pages=max_pages)
|
||||
items = cls._fetch_all_pages(
|
||||
client=client, limit=limit, max_pages=max_pages
|
||||
)
|
||||
addresses = cls._extract_addresses(items)
|
||||
except ProxyToolsClientError as exc:
|
||||
raise ProxyToolsSyncError(str(exc)) from exc
|
||||
@@ -871,7 +873,9 @@ class ProxyToolsSyncService:
|
||||
)
|
||||
batch = cls._extract_items(payload)
|
||||
items.extend(batch)
|
||||
if not cls._has_more_pages(payload, page=page, batch_size=len(batch), limit=limit):
|
||||
if not cls._has_more_pages(
|
||||
payload, page=page, batch_size=len(batch), limit=limit
|
||||
):
|
||||
break
|
||||
return items
|
||||
|
||||
|
||||
713
tests/test_api_inventory_e2e.py
Normal file
713
tests/test_api_inventory_e2e.py
Normal file
@@ -0,0 +1,713 @@
|
||||
"""End-to-end smoke coverage for the full HTTP API inventory."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
from datetime import date, timedelta
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import patch
|
||||
|
||||
from apps.backups.models import BackupExportJob
|
||||
from apps.core.models import BackgroundJob
|
||||
from apps.exchange.models import ExchangeConnection
|
||||
from apps.parsers.models import (
|
||||
FinancialReport,
|
||||
FinancialReportLine,
|
||||
ParserLoadLog,
|
||||
ProcurementRecord,
|
||||
)
|
||||
from apps.user.services import UserService
|
||||
from django.core.files.uploadedfile import SimpleUploadedFile
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django_celery_beat.models import PeriodicTask
|
||||
from openpyxl import Workbook
|
||||
from rest_framework import status
|
||||
from rest_framework.test import APITestCase
|
||||
|
||||
from tests.apps.parsers.factories import (
|
||||
IndustrialCertificateRecordFactory,
|
||||
IndustrialProductRecordFactory,
|
||||
InspectionRecordFactory,
|
||||
ManufacturerRecordFactory,
|
||||
ParserLoadLogFactory,
|
||||
)
|
||||
from tests.apps.registers.factories import RegisterFactory
|
||||
from tests.apps.user.factories import UserFactory
|
||||
from tests.utils.fixtures import fake
|
||||
|
||||
|
||||
def _digits(length: int) -> str:
|
||||
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
|
||||
|
||||
|
||||
def _build_fns_excel_bytes() -> bytes:
|
||||
workbook = Workbook()
|
||||
worksheet = workbook.active
|
||||
year = fake.random_int(min=2020, max=2025)
|
||||
worksheet.append(["Form", None, year, None])
|
||||
worksheet.append([None, "Code", "Start", "End"])
|
||||
worksheet.append(
|
||||
[fake.word(), _digits(4), fake.random_int(10, 999), fake.random_int(10, 999)]
|
||||
)
|
||||
buffer = io.BytesIO()
|
||||
workbook.save(buffer)
|
||||
workbook.close()
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
def _build_register_excel_bytes(rows: list[dict[str, str]]) -> bytes:
|
||||
workbook = Workbook()
|
||||
worksheet = workbook.active
|
||||
worksheet.append(["pn_name", "mn_ogrn", "mn_inn", "in_kpp", "mn_okpo"])
|
||||
for row in rows:
|
||||
worksheet.append(
|
||||
[
|
||||
row["pn_name"],
|
||||
row["mn_ogrn"],
|
||||
row["mn_inn"],
|
||||
row["in_kpp"],
|
||||
row["mn_okpo"],
|
||||
]
|
||||
)
|
||||
buffer = io.BytesIO()
|
||||
workbook.save(buffer)
|
||||
workbook.close()
|
||||
return buffer.getvalue()
|
||||
|
||||
|
||||
def _extract_results(response_data):
|
||||
if hasattr(response_data, "get"):
|
||||
data = response_data.get("data")
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
results = response_data.get("results")
|
||||
if results is not None:
|
||||
return results
|
||||
return response_data
|
||||
|
||||
|
||||
def _create_procurement_record() -> ProcurementRecord:
|
||||
return ProcurementRecord.objects.create(
|
||||
load_batch=fake.random_int(min=1, max=1000),
|
||||
purchase_number=_digits(19),
|
||||
purchase_name=fake.sentence(nb_words=6),
|
||||
customer_inn=_digits(10),
|
||||
customer_kpp=_digits(9),
|
||||
customer_ogrn=_digits(13),
|
||||
customer_name=fake.company(),
|
||||
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
|
||||
status=fake.word(),
|
||||
law_type="44-FZ",
|
||||
href=fake.url(),
|
||||
region_code=f"{fake.random_int(min=1, max=99):02d}",
|
||||
)
|
||||
|
||||
|
||||
class AuthenticatedApiMixin:
|
||||
def authenticate(self, user):
|
||||
tokens = UserService.get_tokens_for_user(user)
|
||||
self.client.credentials(HTTP_AUTHORIZATION=f"Bearer {tokens['access']}")
|
||||
return tokens
|
||||
|
||||
|
||||
class PublicApiInventoryE2ETest(APITestCase):
|
||||
def test_health_and_swagger_endpoints(self):
|
||||
swagger_response = self.client.get(reverse("schema-swagger-ui"))
|
||||
health_response = self.client.get(reverse("core:health"))
|
||||
live_response = self.client.get(reverse("core:liveness"))
|
||||
ready_response = self.client.get(reverse("core:readiness"))
|
||||
|
||||
self.assertEqual(swagger_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(health_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(live_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(ready_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
|
||||
class UserApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
|
||||
def setUp(self):
|
||||
self.admin = UserFactory.create_superuser()
|
||||
|
||||
def test_auth_and_profile_endpoints(self):
|
||||
initial_password = fake.password(length=16, special_chars=False)
|
||||
new_password = fake.password(length=18, special_chars=False)
|
||||
register_payload = {
|
||||
"email": fake.unique.email(),
|
||||
"username": fake.unique.user_name(),
|
||||
"password": initial_password,
|
||||
"password_confirm": initial_password,
|
||||
"phone": f"+7{fake.numerify('##########')}",
|
||||
"first_name": "Ivan",
|
||||
"middle_name": "Ivanovich",
|
||||
"last_name": "Ivanov",
|
||||
}
|
||||
|
||||
register_response = self.client.post(
|
||||
reverse("api_v1:user:register"),
|
||||
register_payload,
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(register_response.status_code, status.HTTP_201_CREATED)
|
||||
self.assertIn("tokens", register_response.data)
|
||||
|
||||
login_response = self.client.post(
|
||||
reverse("api_v1:user:login"),
|
||||
{
|
||||
"username": register_payload["username"],
|
||||
"password": initial_password,
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(login_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
verify_response = self.client.post(
|
||||
reverse("api_v1:user:token_verify"),
|
||||
{"token": login_response.data["access"]},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(verify_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
refresh_response = self.client.post(
|
||||
reverse("api_v1:user:token_refresh"),
|
||||
{"refresh": login_response.data["refresh"]},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(refresh_response.status_code, status.HTTP_200_OK)
|
||||
self.client.credentials(
|
||||
HTTP_AUTHORIZATION=f"Bearer {refresh_response.data['access']}"
|
||||
)
|
||||
|
||||
me_response = self.client.get(reverse("api_v1:user:current_user"))
|
||||
self.assertEqual(me_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(me_response.data["username"], register_payload["username"])
|
||||
|
||||
me_update_response = self.client.patch(
|
||||
reverse("api_v1:user:user_update"),
|
||||
{"phone": f"+7{fake.numerify('##########')}"},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(me_update_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
profile_response = self.client.get(reverse("api_v1:user:profile_detail"))
|
||||
self.assertEqual(profile_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
profile_patch_response = self.client.patch(
|
||||
reverse("api_v1:user:profile_detail"),
|
||||
{
|
||||
"first_name": "Petr",
|
||||
"middle_name": "Petrovich",
|
||||
"last_name": "Petrov",
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(profile_patch_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
profile_full_response = self.client.get(reverse("api_v1:user:profile_full"))
|
||||
self.assertEqual(profile_full_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(profile_full_response.data["username"], register_payload["username"])
|
||||
|
||||
password_change_response = self.client.post(
|
||||
reverse("api_v1:user:password_change"),
|
||||
{
|
||||
"old_password": initial_password,
|
||||
"new_password": new_password,
|
||||
"new_password_confirm": new_password,
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(password_change_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
relogin_response = self.client.post(
|
||||
reverse("api_v1:user:login"),
|
||||
{
|
||||
"username": register_payload["username"],
|
||||
"password": new_password,
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(relogin_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
self.client.credentials(
|
||||
HTTP_AUTHORIZATION=f"Bearer {relogin_response.data['access']}"
|
||||
)
|
||||
logout_response = self.client.post(
|
||||
reverse("api_v1:user:logout"),
|
||||
{},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(logout_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_admin_user_management_endpoints(self):
|
||||
self.authenticate(self.admin)
|
||||
list_response = self.client.get(reverse("api_v1:user:admin-users"))
|
||||
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
create_payload = {
|
||||
"email": fake.unique.email(),
|
||||
"username": fake.unique.user_name(),
|
||||
"phone": f"+7{fake.numerify('##########')}",
|
||||
"password": "AdminManagedPass123",
|
||||
"role": "user",
|
||||
"first_name": "Alex",
|
||||
"middle_name": "Alexeevich",
|
||||
"last_name": "Alexeev",
|
||||
}
|
||||
create_response = self.client.post(
|
||||
reverse("api_v1:user:admin-users"),
|
||||
create_payload,
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(create_response.status_code, status.HTTP_201_CREATED)
|
||||
managed_user_id = create_response.data["id"]
|
||||
|
||||
detail_url = reverse("api_v1:user:admin-user-detail", args=[managed_user_id])
|
||||
detail_response = self.client.get(detail_url)
|
||||
self.assertEqual(detail_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
patch_response = self.client.patch(
|
||||
detail_url,
|
||||
{
|
||||
"role": "admin",
|
||||
"first_name": "Sergey",
|
||||
"middle_name": "Sergeevich",
|
||||
"last_name": "Sergeev",
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(patch_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(patch_response.data["role"], "admin")
|
||||
|
||||
deactivate_response = self.client.post(
|
||||
reverse("api_v1:user:admin-user-deactivate", args=[managed_user_id]),
|
||||
{},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(deactivate_response.status_code, status.HTTP_200_OK)
|
||||
|
||||
activate_response = self.client.post(
|
||||
reverse("api_v1:user:admin-user-activate", args=[managed_user_id]),
|
||||
{},
|
||||
format="json",
|
||||
)
|
||||
self.assertEqual(activate_response.status_code, status.HTTP_200_OK)
|
||||
self.assertTrue(activate_response.data["is_active"])
|
||||
|
||||
|
||||
class JobsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
|
||||
def setUp(self):
|
||||
self.user = UserFactory.create_user()
|
||||
|
||||
def _create_job(self, *, task_id: str, status_value: str) -> BackgroundJob:
|
||||
started_at = timezone.now()
|
||||
completed_at = started_at + timedelta(seconds=3)
|
||||
return BackgroundJob.objects.create(
|
||||
task_id=task_id,
|
||||
task_name="apps.parsers.tasks.fake_task",
|
||||
status=status_value,
|
||||
user_id=self.user.id,
|
||||
started_at=started_at,
|
||||
completed_at=completed_at,
|
||||
progress=100,
|
||||
)
|
||||
|
||||
def test_jobs_endpoints(self):
|
||||
job = self._create_job(task_id="inventory-job-1", status_value="success")
|
||||
self.authenticate(self.user)
|
||||
|
||||
list_response = self.client.get(reverse("api_v1:jobs:job-list"))
|
||||
self.assertEqual(list_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(list_response.data["results"][0]["task_id"], job.task_id)
|
||||
|
||||
status_response = self.client.get(
|
||||
reverse("api_v1:jobs:job-status", kwargs={"task_id": job.task_id})
|
||||
)
|
||||
self.assertEqual(status_response.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(status_response.data["status"], "success")
|
||||
|
||||
stream_response = self.client.get(
|
||||
reverse("api_v1:jobs:job-stream", kwargs={"task_id": job.task_id})
|
||||
)
|
||||
self.assertEqual(stream_response.status_code, status.HTTP_200_OK)
|
||||
stream_chunks = b"".join(stream_response.streaming_content).decode("utf-8")
|
||||
self.assertIn("event: completed", stream_chunks)
|
||||
self.assertIn(job.task_id, stream_chunks)
|
||||
|
||||
|
||||
class ParsersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
|
||||
def setUp(self):
|
||||
self.user = UserFactory.create_user()
|
||||
self.admin = UserFactory.create_superuser()
|
||||
|
||||
def _create_financial_report(self) -> FinancialReport:
|
||||
report = FinancialReport.objects.create(
|
||||
external_id=_digits(5),
|
||||
ogrn=_digits(13),
|
||||
file_name=f"fin_{_digits(5)}_{_digits(13)}.xlsx",
|
||||
file_hash=fake.sha256(raw_output=False),
|
||||
load_batch=fake.random_int(min=1, max=1000),
|
||||
status=FinancialReport.Status.SUCCESS,
|
||||
source=FinancialReport.SourceType.API,
|
||||
)
|
||||
FinancialReportLine.objects.create(
|
||||
report=report,
|
||||
form_code="1",
|
||||
line_code="1100",
|
||||
line_name="Assets",
|
||||
year=2025,
|
||||
period_start=100,
|
||||
period_end=200,
|
||||
)
|
||||
return report
|
||||
|
||||
def test_registry_read_endpoints_for_parsers_data(self):
|
||||
certificate = IndustrialCertificateRecordFactory()
|
||||
manufacturer = ManufacturerRecordFactory()
|
||||
product = IndustrialProductRecordFactory()
|
||||
inspection = InspectionRecordFactory()
|
||||
procurement = _create_procurement_record()
|
||||
report = self._create_financial_report()
|
||||
|
||||
self.authenticate(self.user)
|
||||
|
||||
cert_list = self.client.get(reverse("api_v1:minpromtorg:certificates-list"))
|
||||
cert_detail = self.client.get(
|
||||
reverse("api_v1:minpromtorg:certificates-detail", args=[certificate.id])
|
||||
)
|
||||
manufacturers_list = self.client.get(
|
||||
reverse("api_v1:minpromtorg:manufacturers-list")
|
||||
)
|
||||
manufacturer_detail = self.client.get(
|
||||
reverse("api_v1:minpromtorg:manufacturers-detail", args=[manufacturer.id])
|
||||
)
|
||||
products_list = self.client.get(
|
||||
reverse("api_v1:minpromtorg:industrial-products-list")
|
||||
)
|
||||
product_detail = self.client.get(
|
||||
reverse("api_v1:minpromtorg:industrial-products-detail", args=[product.id])
|
||||
)
|
||||
inspections_list = self.client.get(reverse("api_v1:proverki:inspections-list"))
|
||||
inspection_detail = self.client.get(
|
||||
reverse("api_v1:proverki:inspections-detail", args=[inspection.id])
|
||||
)
|
||||
procurements_list = self.client.get(reverse("api_v1:zakupki:procurements-list"))
|
||||
procurement_detail = self.client.get(
|
||||
reverse("api_v1:zakupki:procurements-detail", args=[procurement.id])
|
||||
)
|
||||
fns_reports_list = self.client.get(reverse("api_v1:fns:fns-reports-list"))
|
||||
fns_report_detail = self.client.get(
|
||||
reverse("api_v1:fns:fns-reports-detail", args=[report.id])
|
||||
)
|
||||
|
||||
for response in (
|
||||
cert_list,
|
||||
cert_detail,
|
||||
manufacturers_list,
|
||||
manufacturer_detail,
|
||||
products_list,
|
||||
product_detail,
|
||||
inspections_list,
|
||||
inspection_detail,
|
||||
procurements_list,
|
||||
procurement_detail,
|
||||
fns_reports_list,
|
||||
fns_report_detail,
|
||||
):
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
def test_sources_parsing_and_system_endpoints(self):
|
||||
shared_inn = _digits(10)
|
||||
IndustrialCertificateRecordFactory(inn=shared_inn)
|
||||
IndustrialProductRecordFactory(inn=shared_inn)
|
||||
ManufacturerRecordFactory(load_batch=777, inn=shared_inn)
|
||||
ParserLoadLogFactory(
|
||||
source=ParserLoadLog.Source.INDUSTRIAL,
|
||||
status="success",
|
||||
records_count=1,
|
||||
)
|
||||
ParserLoadLogFactory(
|
||||
source=ParserLoadLog.Source.INDUSTRIAL_PRODUCTS,
|
||||
status="success",
|
||||
records_count=1,
|
||||
)
|
||||
log = ParserLoadLogFactory(
|
||||
source=ParserLoadLog.Source.MANUFACTURES,
|
||||
batch_id=777,
|
||||
status="success",
|
||||
records_count=1,
|
||||
)
|
||||
|
||||
self.authenticate(self.user)
|
||||
sources_list = self.client.get(reverse("api_v1:sources:source-cards-list"))
|
||||
sources_statuses = self.client.get(
|
||||
reverse("api_v1:sources:source-cards-statuses")
|
||||
)
|
||||
source_detail = self.client.get(
|
||||
reverse(
|
||||
"api_v1:sources:source-cards-detail",
|
||||
kwargs={"slug": "manufacturers-and-products"},
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(sources_list.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(sources_statuses.status_code, status.HTTP_200_OK)
|
||||
self.assertEqual(source_detail.status_code, status.HTTP_200_OK)
|
||||
|
||||
self.authenticate(self.admin)
|
||||
parsing_url = reverse("api_v1:parsing:parsing-settings")
|
||||
parsing_get = self.client.get(parsing_url)
|
||||
parsing_patch = self.client.patch(
|
||||
parsing_url,
|
||||
{"planned_inspections": "weekly"},
|
||||
format="json",
|
||||
)
|
||||
|
||||
logs_list = self.client.get(reverse("api_v1:system:parser-logs-list"))
|
||||
logs_detail = self.client.get(
|
||||
reverse("api_v1:system:parser-logs-detail", args=[log.id])
|
||||
)
|
||||
logs_export = self.client.get(reverse("api_v1:system:parser-logs-export"))
|
||||
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
base = Path(tmp_dir)
|
||||
refresh_response = self.client.post(
|
||||
reverse(
|
||||
"api_v1:sources:source-cards-refresh",
|
||||
kwargs={"slug": "financial-indicators"},
|
||||
),
|
||||
{},
|
||||
format="json",
|
||||
)
|
||||
upload = SimpleUploadedFile(
|
||||
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
|
||||
_build_fns_excel_bytes(),
|
||||
content_type=(
|
||||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||||
),
|
||||
)
|
||||
with self.settings(
|
||||
FNS_WATCH_DIRECTORY=str(base / "watch"),
|
||||
FNS_PROCESSED_DIRECTORY=str(base / "processed"),
|
||||
FNS_FAILED_DIRECTORY=str(base / "failed"),
|
||||
):
|
||||
fns_upload = self.client.post(
|
||||
reverse("api_v1:fns:fns-upload"),
|
||||
{"file": upload},
|
||||
format="multipart",
|
||||
)
|
||||
|
||||
for response in (
|
||||
parsing_get,
|
||||
parsing_patch,
|
||||
logs_list,
|
||||
logs_detail,
|
||||
logs_export,
|
||||
refresh_response,
|
||||
fns_upload,
|
||||
):
|
||||
self.assertIn(
|
||||
response.status_code,
|
||||
{
|
||||
status.HTTP_200_OK,
|
||||
status.HTTP_202_ACCEPTED,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
class RegistersApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
|
||||
def setUp(self):
|
||||
self.user = UserFactory.create_user()
|
||||
self.admin = UserFactory.create_superuser()
|
||||
|
||||
def test_registers_endpoints(self):
|
||||
registry = RegisterFactory(name="Inventory Registry")
|
||||
rows = [
|
||||
{
|
||||
"pn_name": "Inventory Org",
|
||||
"mn_ogrn": "1027600980990",
|
||||
"mn_inn": "7601000086",
|
||||
"in_kpp": "760401001",
|
||||
"mn_okpo": "07506197",
|
||||
}
|
||||
]
|
||||
|
||||
self.authenticate(self.admin)
|
||||
upload = SimpleUploadedFile(
|
||||
"inventory-registry.xlsx",
|
||||
_build_register_excel_bytes(rows),
|
||||
content_type=(
|
||||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
||||
),
|
||||
)
|
||||
upload_response = self.client.post(
|
||||
reverse("api_v1:registers:register-upload"),
|
||||
{
|
||||
"registry": str(registry.id),
|
||||
"actual_date": date(2026, 1, 1).isoformat(),
|
||||
"file": upload,
|
||||
},
|
||||
format="multipart",
|
||||
)
|
||||
self.assertEqual(upload_response.status_code, status.HTTP_201_CREATED)
|
||||
|
||||
self.authenticate(self.user)
|
||||
registries_list = self.client.get(reverse("api_v1:registers:registries-list"))
|
||||
registries_detail = self.client.get(
|
||||
reverse("api_v1:registers:registries-detail", args=[registry.id])
|
||||
)
|
||||
organizations_list = self.client.get(
|
||||
reverse("api_v1:registers:organizations-list")
|
||||
)
|
||||
organization_id = _extract_results(organizations_list.data)[0]["id"]
|
||||
organization_detail = self.client.get(
|
||||
reverse("api_v1:registers:organizations-detail", args=[organization_id])
|
||||
)
|
||||
registry_organizations = self.client.get(
|
||||
reverse(
|
||||
"api_v1:registers:registry-organizations-list",
|
||||
args=[registry.id],
|
||||
)
|
||||
)
|
||||
|
||||
for response in (
|
||||
registries_list,
|
||||
registries_detail,
|
||||
organizations_list,
|
||||
organization_detail,
|
||||
registry_organizations,
|
||||
):
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
|
||||
class ExchangeApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
|
||||
def setUp(self):
|
||||
self.admin = UserFactory.create_superuser()
|
||||
|
||||
@patch("apps.exchange.services.ExchangeConnectionService.validate_target_structure")
|
||||
@patch("apps.exchange.services.ExchangeConnectionService.test_connection")
|
||||
@patch("apps.exchange.services.ExchangeConnectionService.test_connection_payload")
|
||||
@patch("apps.exchange.views.copy_parsers_data_async.delay")
|
||||
@patch("apps.exchange.services.ExchangeConnectionService.get_active_connection")
|
||||
def test_exchange_endpoints(
|
||||
self,
|
||||
get_active_connection_mock,
|
||||
delay_mock,
|
||||
test_connection_payload_mock,
|
||||
_test_connection_mock,
|
||||
_validate_mock,
|
||||
):
|
||||
self.authenticate(self.admin)
|
||||
connections_url = reverse("api_v1:exchange:connections")
|
||||
test_connection_url = reverse("api_v1:exchange:connections-test")
|
||||
copy_url = reverse("api_v1:exchange:copy")
|
||||
periodic_tasks_url = reverse("api_v1:exchange:periodic-tasks")
|
||||
|
||||
connection_payload = {
|
||||
"server": "127.0.0.1",
|
||||
"port": 5432,
|
||||
"username": "postgres",
|
||||
"password": "secret",
|
||||
"database_name": "target_db",
|
||||
"schema_name": "public",
|
||||
}
|
||||
test_connection_payload_mock.return_value = {
|
||||
"status": "success",
|
||||
"message": "ok",
|
||||
}
|
||||
|
||||
list_connections = self.client.get(connections_url)
|
||||
create_connection = self.client.post(
|
||||
connections_url,
|
||||
connection_payload,
|
||||
format="json",
|
||||
)
|
||||
connection_test = self.client.post(
|
||||
test_connection_url,
|
||||
connection_payload,
|
||||
format="json",
|
||||
)
|
||||
|
||||
active_connection = ExchangeConnection.objects.get(id=create_connection.data["id"])
|
||||
get_active_connection_mock.return_value = active_connection
|
||||
delay_mock.return_value = SimpleNamespace(id="exchange-task-1")
|
||||
copy_response = self.client.post(copy_url, {"mode": "all"}, format="json")
|
||||
|
||||
list_periodic = self.client.get(periodic_tasks_url)
|
||||
create_periodic = self.client.post(
|
||||
periodic_tasks_url,
|
||||
{
|
||||
"name": "inventory-periodic-task",
|
||||
"description": "inventory",
|
||||
"enabled": True,
|
||||
"schedule_type": "interval",
|
||||
"interval_every": 1,
|
||||
"interval_period": "hours",
|
||||
"mode": "all",
|
||||
"notify_on_error": True,
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
periodic_id = create_periodic.data["id"]
|
||||
periodic_detail_url = reverse(
|
||||
"api_v1:exchange:periodic-task-detail",
|
||||
args=[periodic_id],
|
||||
)
|
||||
detail_periodic = self.client.get(periodic_detail_url)
|
||||
patch_periodic = self.client.patch(
|
||||
periodic_detail_url,
|
||||
{
|
||||
"name": "inventory-periodic-task-updated",
|
||||
"enabled": False,
|
||||
"schedule_type": "interval",
|
||||
"interval_every": 2,
|
||||
"interval_period": "hours",
|
||||
"mode": "all",
|
||||
"notify_on_error": False,
|
||||
},
|
||||
format="json",
|
||||
)
|
||||
|
||||
for response in (
|
||||
list_connections,
|
||||
create_connection,
|
||||
connection_test,
|
||||
copy_response,
|
||||
list_periodic,
|
||||
create_periodic,
|
||||
detail_periodic,
|
||||
patch_periodic,
|
||||
):
|
||||
self.assertIn(
|
||||
response.status_code,
|
||||
{
|
||||
status.HTTP_200_OK,
|
||||
status.HTTP_201_CREATED,
|
||||
status.HTTP_202_ACCEPTED,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertTrue(PeriodicTask.objects.filter(id=periodic_id).exists())
|
||||
self.assertTrue(ExchangeConnection.objects.filter(id=active_connection.id).exists())
|
||||
|
||||
|
||||
class BackupsApiInventoryE2ETest(AuthenticatedApiMixin, APITestCase):
|
||||
def setUp(self):
|
||||
self.admin = UserFactory.create_superuser()
|
||||
|
||||
@patch("apps.backups.services.BackupExportJobService._enqueue_backup_task")
|
||||
def test_backup_export_endpoint(self, enqueue_mock):
|
||||
self.authenticate(self.admin)
|
||||
today = timezone.localdate()
|
||||
|
||||
with self.captureOnCommitCallbacks(execute=True):
|
||||
response = self.client.post(
|
||||
reverse("api_v1:backups:export"),
|
||||
{"actual_date": today.isoformat()},
|
||||
format="json",
|
||||
)
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
|
||||
self.assertEqual(response.data["status"], "started")
|
||||
self.assertTrue(BackupExportJob.objects.filter(actual_date=today).exists())
|
||||
enqueue_mock.assert_called_once()
|
||||
Reference in New Issue
Block a user