fix(parsers): unblock source status and heavy product import
All checks were successful
CI/CD Pipeline / Quality Gate (push) Successful in 19s
CI/CD Pipeline / Build and Push Images (push) Successful in 6s
CI/CD Pipeline / Internal Notify (push) Successful in 1s
CI/CD Pipeline / Deploy Dev in Dokploy (push) Successful in 1s

This commit is contained in:
2026-04-28 22:51:52 +02:00
parent e921f4b204
commit 01387ae13b
6 changed files with 182 additions and 25 deletions

View File

@@ -93,6 +93,12 @@ def _normalize_header(value) -> str:
return re.sub(r"[^a-zа-я0-9]+", "", text) return re.sub(r"[^a-zа-я0-9]+", "", text)
NORMALIZED_HEADER_ALIASES = {
field_name: {_normalize_header(alias) for alias in aliases}
for field_name, aliases in HEADER_ALIASES.items()
}
@dataclass @dataclass
class IndustrialProductsClient: class IndustrialProductsClient:
""" """
@@ -205,34 +211,36 @@ class IndustrialProductsClient:
logger.info("Downloading Excel file: %s", file_url) logger.info("Downloading Excel file: %s", file_url)
content = self.http_client.download_file(file_url) content = self.http_client.download_file(file_url)
workbook = load_workbook(filename=BytesIO(content), data_only=True) workbook = load_workbook(
worksheet = workbook.active filename=BytesIO(content),
data_only=True,
read_only=True,
)
try:
worksheet = workbook.active
header_row_index, header_map = self._detect_headers(worksheet) header_row_index, header_map = self._detect_headers(worksheet)
products: list[IndustrialProduct] = [] products: list[IndustrialProduct] = []
for row in worksheet.iter_rows( for row in worksheet.iter_rows(
min_row=header_row_index + 1, min_row=header_row_index + 1,
values_only=True, values_only=True,
): ):
product = self._parse_row(row, header_map) product = self._parse_row(row, header_map)
if product: if product:
products.append(product) products.append(product)
workbook.close() return products
return products finally:
workbook.close()
def _detect_headers(self, worksheet) -> tuple[int, dict[str, int]]: def _detect_headers(self, worksheet) -> tuple[int, dict[str, int]]:
best_map: dict[str, int] = {} best_map: dict[str, int] = {}
for row_index in range(1, min(worksheet.max_row, 10) + 1): for row_index, row in enumerate(
row = next( worksheet.iter_rows(max_row=10, values_only=True),
worksheet.iter_rows( start=1,
min_row=row_index, ):
max_row=row_index,
values_only=True,
)
)
header_map = self._build_header_map(row) header_map = self._build_header_map(row)
if len(header_map) > len(best_map): if len(header_map) > len(best_map):
best_map = header_map best_map = header_map
@@ -254,8 +262,8 @@ class IndustrialProductsClient:
if not normalized: if not normalized:
continue continue
for field_name, aliases in HEADER_ALIASES.items(): for field_name, aliases in NORMALIZED_HEADER_ALIASES.items():
if normalized in {_normalize_header(alias) for alias in aliases}: if normalized in aliases:
header_map[field_name] = index header_map[field_name] = index
break break

View File

@@ -13,6 +13,7 @@ from apps.core.services import BackgroundJobService
from apps.parsers.models import ( from apps.parsers.models import (
FinancialReport, FinancialReport,
FinancialReportLine, FinancialReportLine,
GenericParserRecord,
IndustrialCertificateRecord, IndustrialCertificateRecord,
IndustrialProductRecord, IndustrialProductRecord,
InspectionRecord, InspectionRecord,
@@ -100,6 +101,9 @@ SOURCE_CARD_DEFINITIONS: tuple[SourceCardDefinition, ...] = (
task_names=( task_names=(
"apps.parsers.tasks.parse_procurements", "apps.parsers.tasks.parse_procurements",
"apps.parsers.tasks.sync_procurements", "apps.parsers.tasks.sync_procurements",
"apps.parsers.tasks.parse_procurements_44fz",
"apps.parsers.tasks.parse_procurements_223fz",
"apps.parsers.tasks.parse_contracts",
), ),
source_items=( source_items=(
SourceItemDefinition( SourceItemDefinition(
@@ -108,6 +112,24 @@ SOURCE_CARD_DEFINITIONS: tuple[SourceCardDefinition, ...] = (
description=("Закупки и связанные данные из ЕИС по 44-ФЗ и 223-ФЗ."), description=("Закупки и связанные данные из ЕИС по 44-ФЗ и 223-ФЗ."),
parser_source=ParserLoadLog.Source.PROCUREMENTS, parser_source=ParserLoadLog.Source.PROCUREMENTS,
), ),
SourceItemDefinition(
code="procurements_44fz",
title="Закупки 44-ФЗ",
description="Извещения и закупочные процедуры 44-ФЗ.",
parser_source=ParserLoadLog.Source.PROCUREMENTS_44FZ,
),
SourceItemDefinition(
code="procurements_223fz",
title="Закупки 223-ФЗ",
description="Извещения и закупочные процедуры 223-ФЗ.",
parser_source=ParserLoadLog.Source.PROCUREMENTS_223FZ,
),
SourceItemDefinition(
code="contracts",
title="Контракты ЕИС",
description="Государственные и корпоративные контракты.",
parser_source=ParserLoadLog.Source.CONTRACTS,
),
), ),
refresh_params=( refresh_params=(
RefreshParamDefinition( RefreshParamDefinition(
@@ -203,6 +225,11 @@ SOURCE_CARD_BY_PARSER_SOURCE = {
for source_item in definition.source_items for source_item in definition.source_items
if source_item.parser_source if source_item.parser_source
} }
GENERIC_RECORD_SOURCES_BY_ITEM_CODE = {
"procurements_44fz": ParserLoadLog.Source.PROCUREMENTS_44FZ,
"procurements_223fz": ParserLoadLog.Source.PROCUREMENTS_223FZ,
"contracts": ParserLoadLog.Source.CONTRACTS,
}
class SourceCardService: class SourceCardService:
@@ -576,6 +603,9 @@ class SourceCardService:
@classmethod @classmethod
def _get_source_records_count(cls, item_code: str) -> int: def _get_source_records_count(cls, item_code: str) -> int:
generic_source = GENERIC_RECORD_SOURCES_BY_ITEM_CODE.get(item_code)
if generic_source:
return GenericParserRecord.objects.filter(source=generic_source).count()
if item_code == "fns_reports": if item_code == "fns_reports":
return FinancialReportLine.objects.count() return FinancialReportLine.objects.count()
if item_code == "industrial": if item_code == "industrial":
@@ -592,6 +622,15 @@ class SourceCardService:
@classmethod @classmethod
def _get_source_organizations_count(cls, item_code: str) -> int: def _get_source_organizations_count(cls, item_code: str) -> int:
generic_source = GENERIC_RECORD_SOURCES_BY_ITEM_CODE.get(item_code)
if generic_source:
return (
GenericParserRecord.objects.filter(source=generic_source)
.exclude(inn="")
.values("inn")
.distinct()
.count()
)
if item_code == "fns_reports": if item_code == "fns_reports":
return ( return (
FinancialReport.objects.exclude(ogrn="") FinancialReport.objects.exclude(ogrn="")
@@ -638,6 +677,11 @@ class SourceCardService:
@classmethod @classmethod
def _get_source_data_timestamp(cls, item_code: str): def _get_source_data_timestamp(cls, item_code: str):
generic_source = GENERIC_RECORD_SOURCES_BY_ITEM_CODE.get(item_code)
if generic_source:
return GenericParserRecord.objects.filter(source=generic_source).aggregate(
last_updated=Max("updated_at")
)["last_updated"]
if item_code == "fns_reports": if item_code == "fns_reports":
return FinancialReport.objects.aggregate(last_updated=Max("updated_at"))[ return FinancialReport.objects.aggregate(last_updated=Max("updated_at"))[
"last_updated" "last_updated"
@@ -670,6 +714,24 @@ class SourceCardService:
definition: SourceCardDefinition, definition: SourceCardDefinition,
source_items: list[dict[str, Any]], source_items: list[dict[str, Any]],
) -> int: ) -> int:
if definition.slug == "public-procurements":
legacy_inns = (
ProcurementRecord.objects.exclude(customer_inn="")
.order_by()
.values_list("customer_inn", flat=True)
.distinct()
)
generic_inns = (
GenericParserRecord.objects.filter(
source__in=GENERIC_RECORD_SOURCES_BY_ITEM_CODE.values()
)
.exclude(inn="")
.order_by()
.values_list("inn", flat=True)
.distinct()
)
return legacy_inns.union(generic_inns).count()
if definition.slug != "manufacturers-and-products": if definition.slug != "manufacturers-and-products":
return sum(item["organizations_count"] for item in source_items) return sum(item["organizations_count"] for item in source_items)

View File

@@ -59,6 +59,8 @@ FEDRESURS_CHECKO_FALLBACK_LIMIT = 100
PARSER_STALE_LOAD_MAX_AGE_MINUTES = 90 PARSER_STALE_LOAD_MAX_AGE_MINUTES = 90
PARSER_SOFT_TIME_LIMIT_SECONDS = 15 * 60 PARSER_SOFT_TIME_LIMIT_SECONDS = 15 * 60
PARSER_TIME_LIMIT_SECONDS = 20 * 60 PARSER_TIME_LIMIT_SECONDS = 20 * 60
INDUSTRIAL_PRODUCTS_SOFT_TIME_LIMIT_SECONDS = 45 * 60
INDUSTRIAL_PRODUCTS_TIME_LIMIT_SECONDS = 60 * 60
class ParserSourceSkipped(Exception): class ParserSourceSkipped(Exception):
@@ -792,8 +794,8 @@ def parse_manufactures(
@shared_task( @shared_task(
bind=True, bind=True,
soft_time_limit=PARSER_SOFT_TIME_LIMIT_SECONDS, soft_time_limit=INDUSTRIAL_PRODUCTS_SOFT_TIME_LIMIT_SECONDS,
time_limit=PARSER_TIME_LIMIT_SECONDS, time_limit=INDUSTRIAL_PRODUCTS_TIME_LIMIT_SECONDS,
) )
def parse_industrial_products( def parse_industrial_products(
self, self,

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
from unittest.mock import patch
from urllib.parse import urlparse from urllib.parse import urlparse
import requests import requests
@@ -22,6 +23,7 @@ from apps.parsers.clients.minpromtorg.schemas import (
from apps.parsers.clients.proverki import ProverkiClient from apps.parsers.clients.proverki import ProverkiClient
from apps.parsers.clients.proverki.schemas import Inspection from apps.parsers.clients.proverki.schemas import Inspection
from django.test import TestCase, tag from django.test import TestCase, tag
from openpyxl import load_workbook as openpyxl_load_workbook
from requests.adapters import BaseAdapter from requests.adapters import BaseAdapter
from tests.utils import Response, TestHTTPServer from tests.utils import Response, TestHTTPServer
@@ -513,6 +515,40 @@ class IndustrialProductsClientTest(TestCase):
self.assertEqual(products, []) self.assertEqual(products, [])
def test_fetch_products_uses_read_only_workbook(self):
excel_bytes, rows = build_minpromtorg_products_excel(count=2)
file_name = "industrial_products_20260428.xlsx"
with TestHTTPServer() as server:
server.add_json(
"/api/kss-document-preview",
{
"data": [
{
"name": IndustrialProductsClient().query,
"files": [
{"name": file_name, "url": f"/files/{file_name}"}
],
}
]
},
)
server.add_bytes(f"/files/{file_name}", excel_bytes)
client = IndustrialProductsClient(
host=_host_from_base_url(server.base_url),
scheme="http",
http_adapter=server.adapter,
)
with patch(
"apps.parsers.clients.minpromtorg.products.load_workbook",
wraps=openpyxl_load_workbook,
) as load_workbook_mock:
products = client.fetch_products()
self.assertEqual(len(products), len(rows))
self.assertTrue(load_workbook_mock.call_args.kwargs["read_only"])
def test_get_latest_file_url_falls_back_to_excel_file(self): def test_get_latest_file_url_falls_back_to_excel_file(self):
client = IndustrialProductsClient() client = IndustrialProductsClient()
files = [ files = [

View File

@@ -5,6 +5,7 @@ from types import SimpleNamespace
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from apps.core.models import BackgroundJob, JobStatus from apps.core.models import BackgroundJob, JobStatus
from apps.parsers.models import GenericParserRecord, ParserLoadLog
from apps.parsers.source_cards import ( from apps.parsers.source_cards import (
SourceCardDefinition, SourceCardDefinition,
SourceCardService, SourceCardService,
@@ -296,6 +297,42 @@ class SourceCardServiceUnitTest(SimpleTestCase):
@override_settings(PARSER_STALE_LOAD_MAX_AGE_MINUTES=90) @override_settings(PARSER_STALE_LOAD_MAX_AGE_MINUTES=90)
class SourceCardServiceDatabaseTest(TestCase): class SourceCardServiceDatabaseTest(TestCase):
def test_public_procurements_counts_generic_eis_sources(self):
GenericParserRecord.objects.create(
source=ParserLoadLog.Source.PROCUREMENTS_44FZ,
load_batch=1,
external_id="notice-1",
inn="7701234567",
title="Закупка 44-ФЗ",
payload={"number": "notice-1"},
)
GenericParserRecord.objects.create(
source=ParserLoadLog.Source.CONTRACTS,
load_batch=1,
external_id="contract-1",
inn="7701234567",
title="Контракт ЕИС",
payload={"number": "contract-1"},
)
ParserLoadLog.objects.create(
source=ParserLoadLog.Source.PROCUREMENTS_44FZ,
batch_id=1,
records_count=1,
status=ParserLoadLog.Status.SUCCESS,
)
ParserLoadLog.objects.create(
source=ParserLoadLog.Source.CONTRACTS,
batch_id=1,
records_count=1,
status=ParserLoadLog.Status.SUCCESS,
)
card = SourceCardService.get_card("public-procurements")
self.assertEqual(card["status"], "success")
self.assertEqual(card["records_count"], 2)
self.assertEqual(card["organizations_count"], 1)
def test_get_active_tasks_ignores_old_jobs_even_when_updated_recently(self): def test_get_active_tasks_ignores_old_jobs_even_when_updated_recently(self):
job = BackgroundJob.objects.create( job = BackgroundJob.objects.create(
task_id="old-source-task", task_id="old-source-task",

View File

@@ -40,6 +40,8 @@ from apps.parsers.models import (
) )
from apps.parsers.services import ParserLoadLogService from apps.parsers.services import ParserLoadLogService
from apps.parsers.tasks import ( from apps.parsers.tasks import (
INDUSTRIAL_PRODUCTS_SOFT_TIME_LIMIT_SECONDS,
INDUSTRIAL_PRODUCTS_TIME_LIMIT_SECONDS,
_move_to_dir, _move_to_dir,
_process_fns_file_sync, _process_fns_file_sync,
_remove_lock, _remove_lock,
@@ -589,6 +591,16 @@ class SyncProcurementsTaskTestCase(TestCase):
class MinpromtorgTasksTestCase(TestCase): class MinpromtorgTasksTestCase(TestCase):
"""Tests for Minpromtorg tasks.""" """Tests for Minpromtorg tasks."""
def test_parse_industrial_products_has_extended_time_limits(self):
self.assertEqual(
parse_industrial_products.soft_time_limit,
INDUSTRIAL_PRODUCTS_SOFT_TIME_LIMIT_SECONDS,
)
self.assertEqual(
parse_industrial_products.time_limit,
INDUSTRIAL_PRODUCTS_TIME_LIMIT_SECONDS,
)
def _add_minpromtorg_routes(self, server: TestHTTPServer): def _add_minpromtorg_routes(self, server: TestHTTPServer):
certificates_bytes, cert_rows = build_minpromtorg_certificates_excel(count=2) certificates_bytes, cert_rows = build_minpromtorg_certificates_excel(count=2)
manufacturers_bytes, manuf_rows = build_minpromtorg_manufacturers_excel(count=2) manufacturers_bytes, manuf_rows = build_minpromtorg_manufacturers_excel(count=2)