447 lines
18 KiB
Python
447 lines
18 KiB
Python
"""Tests for parsers admin configurations."""
|
|
|
|
import io
|
|
import os
|
|
import tempfile
|
|
import zipfile
|
|
from unittest.mock import patch
|
|
|
|
from apps.parsers.admin import (
|
|
FinancialReportAdmin,
|
|
HasCertificateNumberFilter,
|
|
IndustrialCertificateRecordAdmin,
|
|
InspectionRecordAdmin,
|
|
ManufacturerRecordAdmin,
|
|
ParserLoadLogAdmin,
|
|
ProcurementRecordAdmin,
|
|
ProxyAdmin,
|
|
)
|
|
from apps.parsers.models import (
|
|
FinancialReport,
|
|
FinancialReportLine,
|
|
IndustrialCertificateRecord,
|
|
InspectionRecord,
|
|
ManufacturerRecord,
|
|
ParserLoadLog,
|
|
ProcurementRecord,
|
|
Proxy,
|
|
)
|
|
from django.contrib.admin.sites import AdminSite
|
|
from django.contrib.messages.storage.fallback import FallbackStorage
|
|
from django.core.files.uploadedfile import SimpleUploadedFile
|
|
from django.test import RequestFactory, TestCase, override_settings
|
|
from django.urls import reverse
|
|
from openpyxl import Workbook
|
|
|
|
from tests.apps.parsers.factories import (
|
|
IndustrialCertificateRecordFactory,
|
|
InspectionRecordFactory,
|
|
ManufacturerRecordFactory,
|
|
ParserLoadLogFactory,
|
|
ProxyFactory,
|
|
)
|
|
from tests.apps.user.factories import UserFactory
|
|
from tests.utils.fixtures import fake
|
|
|
|
_CYRILLIC_FINISHED = "\u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d"
|
|
_CYRILLIC_PUBLISHED = "\u043e\u043f\u0443\u0431\u043b\u0438\u043a"
|
|
|
|
|
|
def _digits(length: int) -> str:
|
|
return "".join(str(fake.random_int(0, 9)) for _ in range(length))
|
|
|
|
|
|
def _build_fns_excel_bytes() -> bytes:
|
|
workbook = Workbook()
|
|
worksheet = workbook.active
|
|
year = fake.random_int(min=2020, max=2025)
|
|
worksheet.append(["Форма №1", None, year, None])
|
|
worksheet.append([None, "Код", "Начало", "Конец"])
|
|
worksheet.append(
|
|
[
|
|
fake.word(),
|
|
_digits(4),
|
|
fake.random_int(min=10, max=999),
|
|
fake.random_int(min=10, max=999),
|
|
]
|
|
)
|
|
buffer = io.BytesIO()
|
|
workbook.save(buffer)
|
|
workbook.close()
|
|
return buffer.getvalue()
|
|
|
|
|
|
def _build_fns_zip_upload() -> SimpleUploadedFile:
|
|
buffer = io.BytesIO()
|
|
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive:
|
|
archive.writestr(
|
|
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
|
|
_build_fns_excel_bytes(),
|
|
)
|
|
archive.writestr("ignored.txt", b"invalid")
|
|
return SimpleUploadedFile(
|
|
"fin_ropk.zip",
|
|
buffer.getvalue(),
|
|
content_type="application/zip",
|
|
)
|
|
|
|
|
|
def _build_fns_excel_upload() -> SimpleUploadedFile:
|
|
return SimpleUploadedFile(
|
|
f"fin_{_digits(5)}_{_digits(13)}.xlsx",
|
|
_build_fns_excel_bytes(),
|
|
content_type=(
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
|
|
),
|
|
)
|
|
|
|
|
|
class ParsersAdminTest(TestCase):
|
|
def setUp(self):
|
|
self.site = AdminSite()
|
|
self.factory = RequestFactory()
|
|
self.user = UserFactory.create_superuser()
|
|
self.client.force_login(self.user)
|
|
|
|
def _request(self, path="/"):
|
|
request = self.factory.get(path)
|
|
request.user = self.user
|
|
request.session = {}
|
|
request._messages = FallbackStorage(request)
|
|
return request
|
|
|
|
def _post_request(self, path, data):
|
|
request = self.factory.post(path, data=data)
|
|
request.user = self.user
|
|
request.session = {}
|
|
request._messages = FallbackStorage(request)
|
|
return request
|
|
|
|
def test_proxy_admin_actions(self):
|
|
admin = ProxyAdmin(Proxy, self.site)
|
|
proxy = ProxyFactory(is_active=False, fail_count=5)
|
|
request = self._request()
|
|
qs = Proxy.objects.filter(id=proxy.id)
|
|
|
|
admin.activate_proxies(request, qs)
|
|
proxy.refresh_from_db()
|
|
self.assertTrue(proxy.is_active)
|
|
|
|
admin.deactivate_proxies(request, qs)
|
|
proxy.refresh_from_db()
|
|
self.assertFalse(proxy.is_active)
|
|
|
|
admin.reset_fail_count(request, qs)
|
|
proxy.refresh_from_db()
|
|
self.assertEqual(proxy.fail_count, 0)
|
|
|
|
def test_proxy_admin_has_sync_route(self):
|
|
admin = ProxyAdmin(Proxy, self.site)
|
|
route_names = [route.name for route in admin.get_urls()]
|
|
|
|
self.assertIn("parsers_proxy_sync_proxy_tools", route_names)
|
|
|
|
def test_proxy_admin_changelist_renders_sync_button(self):
|
|
admin = ProxyAdmin(Proxy, self.site)
|
|
response = admin.changelist_view(self._request("/admin/parsers/proxy/"))
|
|
response.render()
|
|
content = response.content.decode("utf-8")
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIn("Обновить список прокси", content)
|
|
self.assertIn("mx-object-tool-form", content)
|
|
|
|
def test_financial_report_changelist_renders_toolbar_buttons(self):
|
|
response = self.client.get(reverse("admin:index"))
|
|
content = response.content.decode("utf-8")
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIn("ФНС Excel", content)
|
|
self.assertIn("ФНС ZIP", content)
|
|
self.assertIn(
|
|
reverse("admin:parsers_financialreport_upload_excel"),
|
|
content,
|
|
)
|
|
self.assertIn(reverse("admin:parsers_financialreport_upload_zip"), content)
|
|
|
|
@patch("apps.parsers.admin.ProxyToolsSyncService.sync_ru_proxies")
|
|
def test_proxy_admin_sync_view_calls_service(self, sync_mock):
|
|
sync_mock.return_value = {
|
|
"status": "success",
|
|
"fetched": 3,
|
|
"created": 2,
|
|
"updated": 1,
|
|
"deactivated": 0,
|
|
}
|
|
admin = ProxyAdmin(Proxy, self.site)
|
|
request = self._post_request(
|
|
"/admin/parsers/proxy/sync-proxy-tools/",
|
|
{},
|
|
)
|
|
|
|
response = admin.sync_proxy_tools_view(request)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
sync_mock.assert_called_once_with()
|
|
|
|
def test_proxy_active_badge(self):
|
|
admin = ProxyAdmin(Proxy, self.site)
|
|
active = ProxyFactory(is_active=True)
|
|
inactive = ProxyFactory(is_active=False)
|
|
self.assertIn("span", str(admin.is_active_badge(active)))
|
|
self.assertIn("span", str(admin.is_active_badge(inactive)))
|
|
|
|
def test_parser_load_log_admin_status_badge(self):
|
|
admin = ParserLoadLogAdmin(ParserLoadLog, self.site)
|
|
log = ParserLoadLogFactory(
|
|
source=ParserLoadLog.Source.FNS_REPORTS,
|
|
status=ParserLoadLog.Status.SUCCESS,
|
|
)
|
|
badge = admin.status_badge(log)
|
|
self.assertIn("span", str(badge))
|
|
self.assertEqual(admin.source_title(log), "Бухгалтерская отчетность ФНС")
|
|
self.assertIn("source_title", admin.list_display)
|
|
self.assertNotIn("batch_id", admin.list_display)
|
|
request = self._request()
|
|
self.assertFalse(admin.has_add_permission(request))
|
|
|
|
def test_certificate_admin_helpers(self):
|
|
admin = IndustrialCertificateRecordAdmin(IndustrialCertificateRecord, self.site)
|
|
record = IndustrialCertificateRecordFactory(organisation_name="X" * 80)
|
|
short_name = admin.organisation_name_short(record)
|
|
self.assertTrue(short_name.endswith("..."))
|
|
|
|
request = self._request()
|
|
self.assertFalse(admin.has_add_permission(request))
|
|
self.assertFalse(admin.has_change_permission(request))
|
|
|
|
def test_certificate_filter(self):
|
|
admin = IndustrialCertificateRecordAdmin(IndustrialCertificateRecord, self.site)
|
|
record_good = IndustrialCertificateRecordFactory(certificate_number="CN-1")
|
|
record_bad = IndustrialCertificateRecordFactory(certificate_number="-")
|
|
|
|
request = self._request()
|
|
filter_yes = HasCertificateNumberFilter(
|
|
request, {"has_cert_number": "yes"}, IndustrialCertificateRecord, admin
|
|
)
|
|
qs_yes = filter_yes.queryset(request, IndustrialCertificateRecord.objects.all())
|
|
self.assertIn(record_good, qs_yes)
|
|
self.assertNotIn(record_bad, qs_yes)
|
|
|
|
filter_no = HasCertificateNumberFilter(
|
|
request, {"has_cert_number": "no"}, IndustrialCertificateRecord, admin
|
|
)
|
|
qs_no = filter_no.queryset(request, IndustrialCertificateRecord.objects.all())
|
|
self.assertIn(record_bad, qs_no)
|
|
|
|
filter_none = HasCertificateNumberFilter(
|
|
request, {}, IndustrialCertificateRecord, admin
|
|
)
|
|
qs_none = filter_none.queryset(
|
|
request, IndustrialCertificateRecord.objects.all()
|
|
)
|
|
self.assertIn(record_good, qs_none)
|
|
|
|
def test_manufacturer_admin_helpers(self):
|
|
admin = ManufacturerRecordAdmin(ManufacturerRecord, self.site)
|
|
record = ManufacturerRecordFactory(
|
|
full_legal_name="Y" * 80,
|
|
address="A" * 80,
|
|
)
|
|
self.assertTrue(admin.full_legal_name_short(record).endswith("..."))
|
|
self.assertTrue(admin.address_short(record).endswith("..."))
|
|
request = self._request()
|
|
self.assertFalse(admin.has_add_permission(request))
|
|
self.assertFalse(admin.has_change_permission(request))
|
|
|
|
def test_inspection_admin_helpers(self):
|
|
admin = InspectionRecordAdmin(InspectionRecord, self.site)
|
|
record = InspectionRecordFactory(
|
|
organisation_name="Org" * 30,
|
|
control_authority="Auth" * 20,
|
|
status=f"{_CYRILLIC_FINISHED}",
|
|
)
|
|
self.assertTrue(admin.organisation_name_short(record).endswith("..."))
|
|
self.assertTrue(admin.control_authority_short(record).endswith("..."))
|
|
self.assertIn("span", str(admin.status_badge(record)))
|
|
record_progress = InspectionRecordFactory(
|
|
status="\u0432 \u043f\u0440\u043e\u0446\u0435\u0441\u0441\u0435"
|
|
)
|
|
record_cancel = InspectionRecordFactory(
|
|
status="\u043e\u0442\u043c\u0435\u043d\u0435\u043d\u0430"
|
|
)
|
|
record_other = InspectionRecordFactory(status="unknown")
|
|
self.assertIn("span", str(admin.status_badge(record_progress)))
|
|
self.assertIn("span", str(admin.status_badge(record_cancel)))
|
|
self.assertIn("span", str(admin.status_badge(record_other)))
|
|
request = self._request()
|
|
self.assertFalse(admin.has_add_permission(request))
|
|
self.assertFalse(admin.has_change_permission(request))
|
|
|
|
def test_procurement_admin_helpers(self):
|
|
admin = ProcurementRecordAdmin(ProcurementRecord, self.site)
|
|
base_number = _digits(18)
|
|
record = ProcurementRecord.objects.create(
|
|
load_batch=fake.random_int(min=1, max=1000),
|
|
purchase_number=f"{base_number}0",
|
|
purchase_name="Name" * 30,
|
|
customer_inn=_digits(10),
|
|
customer_kpp=_digits(9),
|
|
customer_ogrn=_digits(13),
|
|
customer_name="Customer" * 20,
|
|
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
|
|
status=f"{_CYRILLIC_PUBLISHED}",
|
|
law_type="44-FZ",
|
|
)
|
|
self.assertTrue(admin.purchase_name_short(record).endswith("..."))
|
|
self.assertTrue(admin.customer_name_short(record).endswith("..."))
|
|
self.assertIn("span", str(admin.status_badge(record)))
|
|
record_finished = ProcurementRecord.objects.create(
|
|
load_batch=fake.random_int(min=1, max=1000),
|
|
purchase_number=f"{base_number}1",
|
|
purchase_name="Name",
|
|
customer_inn=_digits(10),
|
|
customer_kpp=_digits(9),
|
|
customer_ogrn=_digits(13),
|
|
customer_name="Customer",
|
|
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
|
|
status="\u0437\u0430\u0432\u0435\u0440\u0448\u0435\u043d",
|
|
law_type="44-FZ",
|
|
)
|
|
record_cancel = ProcurementRecord.objects.create(
|
|
load_batch=fake.random_int(min=1, max=1000),
|
|
purchase_number=f"{base_number}2",
|
|
purchase_name="Name",
|
|
customer_inn=_digits(10),
|
|
customer_kpp=_digits(9),
|
|
customer_ogrn=_digits(13),
|
|
customer_name="Customer",
|
|
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
|
|
status="\u043e\u0442\u043c\u0435\u043d\u0435\u043d\u0430",
|
|
law_type="44-FZ",
|
|
)
|
|
record_other = ProcurementRecord.objects.create(
|
|
load_batch=fake.random_int(min=1, max=1000),
|
|
purchase_number=f"{base_number}3",
|
|
purchase_name="Name",
|
|
customer_inn=_digits(10),
|
|
customer_kpp=_digits(9),
|
|
customer_ogrn=_digits(13),
|
|
customer_name="Customer",
|
|
max_price=str(fake.pydecimal(left_digits=7, right_digits=2, positive=True)),
|
|
status="unknown",
|
|
law_type="44-FZ",
|
|
)
|
|
self.assertIn("span", str(admin.status_badge(record_finished)))
|
|
self.assertIn("span", str(admin.status_badge(record_cancel)))
|
|
self.assertIn("span", str(admin.status_badge(record_other)))
|
|
request = self._request()
|
|
self.assertFalse(admin.has_add_permission(request))
|
|
self.assertFalse(admin.has_change_permission(request))
|
|
|
|
def test_financial_report_admin(self):
|
|
admin = FinancialReportAdmin(FinancialReport, self.site)
|
|
report = FinancialReport.objects.create(
|
|
external_id=_digits(5),
|
|
ogrn=_digits(13),
|
|
file_name=f"fin_{_digits(5)}_{_digits(13)}.xlsx",
|
|
file_hash=fake.sha256(raw_output=False),
|
|
load_batch=fake.random_int(min=1, max=1000),
|
|
status=FinancialReport.Status.SUCCESS,
|
|
source=FinancialReport.SourceType.API,
|
|
)
|
|
FinancialReportLine.objects.create(
|
|
report=report,
|
|
form_code="1",
|
|
line_code=_digits(4),
|
|
line_name=fake.word(),
|
|
year=fake.random_int(min=2020, max=2025),
|
|
period_start=fake.random_int(min=1, max=999),
|
|
period_end=fake.random_int(min=1, max=999),
|
|
)
|
|
self.assertEqual(admin.lines_count(report), 1)
|
|
self.assertIn("span", str(admin.status_badge(report)))
|
|
self.assertIn("registry_organization", admin.list_display)
|
|
self.assertIn("registry_organization__pn_name", admin.search_fields)
|
|
route_names = [route.name for route in admin.get_urls()]
|
|
self.assertIn("parsers_financialreport_upload_excel", route_names)
|
|
self.assertIn("parsers_financialreport_upload_zip", route_names)
|
|
|
|
def test_financial_report_admin_upload_excel_get_renders_custom_file_picker(self):
|
|
admin = FinancialReportAdmin(FinancialReport, self.site)
|
|
response = admin.upload_excel_view(self._request())
|
|
response.render()
|
|
content = response.content.decode("utf-8")
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertIn('type="file"', content)
|
|
self.assertIn("mx-upload-file", content)
|
|
self.assertIn("multiple", content)
|
|
|
|
def test_financial_report_admin_upload_zip_view(self):
|
|
admin = FinancialReportAdmin(FinancialReport, self.site)
|
|
archive_upload = _build_fns_zip_upload()
|
|
request = self._post_request(
|
|
"/admin/parsers/financialreport/upload-zip/",
|
|
{"file": archive_upload},
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, override_settings(
|
|
FNS_WATCH_DIRECTORY=os.path.join(tmpdir, "watch"),
|
|
FNS_PROCESSED_DIRECTORY=os.path.join(tmpdir, "processed"),
|
|
FNS_FAILED_DIRECTORY=os.path.join(tmpdir, "failed"),
|
|
), patch("apps.parsers.fns_upload.process_fns_file") as task_mock:
|
|
task_mock.apply_async.side_effect = AssertionError(
|
|
"admin upload should not enqueue celery task"
|
|
)
|
|
response = admin.upload_zip_view(request)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(FinancialReport.objects.count(), 1)
|
|
self.assertEqual(FinancialReportLine.objects.count(), 1)
|
|
|
|
def test_financial_report_admin_upload_excel_view_processes_multiple_files(self):
|
|
admin = FinancialReportAdmin(FinancialReport, self.site)
|
|
first_upload = _build_fns_excel_upload()
|
|
second_upload = _build_fns_excel_upload()
|
|
request = self._post_request(
|
|
"/admin/parsers/financialreport/upload-excel/",
|
|
{"files": [first_upload, second_upload]},
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, override_settings(
|
|
FNS_WATCH_DIRECTORY=os.path.join(tmpdir, "watch"),
|
|
FNS_PROCESSED_DIRECTORY=os.path.join(tmpdir, "processed"),
|
|
FNS_FAILED_DIRECTORY=os.path.join(tmpdir, "failed"),
|
|
), patch("apps.parsers.fns_upload.process_fns_file") as task_mock:
|
|
task_mock.apply_async.side_effect = AssertionError(
|
|
"admin upload should not enqueue celery task"
|
|
)
|
|
response = admin.upload_excel_view(request)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(FinancialReport.objects.count(), 2)
|
|
self.assertEqual(FinancialReportLine.objects.count(), 2)
|
|
|
|
def test_financial_report_admin_upload_excel_view_processes_sync(self):
|
|
admin = FinancialReportAdmin(FinancialReport, self.site)
|
|
excel_upload = _build_fns_excel_upload()
|
|
request = self._post_request(
|
|
"/admin/parsers/financialreport/upload-excel/",
|
|
{"files": [excel_upload]},
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir, override_settings(
|
|
FNS_WATCH_DIRECTORY=os.path.join(tmpdir, "watch"),
|
|
FNS_PROCESSED_DIRECTORY=os.path.join(tmpdir, "processed"),
|
|
FNS_FAILED_DIRECTORY=os.path.join(tmpdir, "failed"),
|
|
), patch("apps.parsers.fns_upload.process_fns_file") as task_mock:
|
|
task_mock.apply_async.side_effect = AssertionError(
|
|
"admin upload should not enqueue celery task"
|
|
)
|
|
response = admin.upload_excel_view(request)
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
self.assertEqual(FinancialReport.objects.count(), 1)
|
|
self.assertEqual(FinancialReportLine.objects.count(), 1)
|