feat(parsers): добавлен парсер zakupki.gov.ru с SOAP API интеграцией

Реализована полная интеграция с ЕИС Закупки через SOAP API
(FTP доступ закрыт с 01.01.2025).

Добавлено:
- ZakupkiClient с поддержкой SOAP методов getDocsByOrgRegionRequest
  и getDocsByReestrNumberRequest
- Модель ProcurementRecord (18 полей, 3 индекса)
- ProcurementService и ParserLoadLogService для бизнес-логики
- Celery задачи parse_procurements и sync_procurements
- Админка с цветовой индикацией статусов и фильтрами
- 71 тест (unit + E2E с RUN_E2E_TESTS=1)

Требования: токен SOAP API через Госуслуги

🤖 Generated with [Qoder][https://qoder.com]
This commit is contained in:
2026-01-27 16:01:28 +01:00
parent 199d871923
commit c6483d8427
16 changed files with 3405 additions and 0 deletions

View File

@@ -13,10 +13,12 @@ from apps.parsers.clients.minpromtorg import (
ManufacturesClient,
)
from apps.parsers.clients.proverki import ProverkiClient
from apps.parsers.clients.zakupki import ZakupkiClient
__all__ = [
"BaseHTTPClient",
"IndustrialProductionClient",
"ManufacturesClient",
"ProverkiClient",
"ZakupkiClient",
]

View File

@@ -0,0 +1,863 @@
"""
Клиент для парсинга данных с zakupki.gov.ru.
Источник: Единая информационная система в сфере закупок (ЕИС).
Стратегия получения данных:
1. SOAP API через int44.zakupki.gov.ru (основной метод с 01.01.2025)
2. Парсинг XML файлов из архивов
Примечание:
FTP доступ закрыт с 1 января 2025 года.
Для работы требуется токен, который можно получить через Госуслуги.
"""
import io
import logging
import re
import uuid
import zipfile
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime
from xml.etree import ( # noqa: S314 - XML parsing with proper error handling
ElementTree as ET,
)
from apps.parsers.clients.base import BaseHTTPClient, HTTPClientError
from apps.parsers.clients.zakupki.schemas import Procurement, ProcurementPlan
logger = logging.getLogger(__name__)
# SOAP API конфигурация
SOAP_API_URL = "https://int44.zakupki.gov.ru/eis-integration/services/getDocsIP"
SOAP_NAMESPACE = "http://zakupki.gov.ru/fz44/get-docs-ip/ws"
# HTTP конфигурация (fallback для прямых ссылок)
DEFAULT_HOST = "zakupki.gov.ru"
# Типы подсистем
SUBSYSTEM_TYPES = {
"44": "PRIZ", # 44-ФЗ Закупки
"223": "OOS223", # 223-ФЗ
}
# Типы документов для 44-ФЗ
DOCUMENT_TYPES_44 = {
"notification": "epNotificationEF2020", # Извещения электронного аукциона
"notification_ok": "epNotificationOK2020", # Открытый конкурс
"notification_zk": "epNotificationZK2020", # Запрос котировок
"contract": "contract", # Контракты
}
class ZakupkiClientError(HTTPClientError):
"""Ошибка клиента zakupki.gov.ru."""
pass
@dataclass
class ZakupkiClient:
"""
Клиент для получения данных о закупках с zakupki.gov.ru.
Полностью изолирован от Django. Все настройки передаются через конструктор.
Стратегия работы:
1. Отправляет SOAP запрос на int44.zakupki.gov.ru
2. Получает URL архива с данными
3. Скачивает и парсит XML файлы из архива
Использование:
client = ZakupkiClient(token="your-token-from-gosuslugi")
procurements = client.fetch_procurements(region_code="77", year=2025)
for proc in procurements:
print(proc.purchase_number, proc.customer_inn)
Примечание:
Для работы требуется токен, который можно получить на:
https://zakupki.gov.ru/pmd/auth/welcome
через авторизацию в Госуслугах.
"""
token: str | None = None # Токен для SOAP API (обязателен для работы)
proxies: list[str] | None = None
host: str = DEFAULT_HOST
timeout: int = 120
_http_client: BaseHTTPClient | None = field(default=None, repr=False)
def __post_init__(self) -> None:
"""Инициализация клиента."""
self._http_client = None
@property
def http_client(self) -> BaseHTTPClient:
"""Ленивая инициализация HTTP клиента."""
if self._http_client is None:
self._http_client = BaseHTTPClient(
base_url=f"https://{self.host}",
proxies=self.proxies,
timeout=self.timeout,
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Accept": "application/json, application/xml, text/html, */*",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
},
)
return self._http_client
def fetch_procurements(
self,
*,
region_code: str | None = None,
year: int | None = None,
month: int | None = None,
file_url: str | None = None,
law_type: str = "44",
reestr_number: str | None = None,
document_type: str = "notification",
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""
Получить список закупок.
Args:
region_code: Код региона (например, "77" для Москвы)
year: Год данных
month: Месяц (опционально)
file_url: Прямая ссылка на файл данных (HTTP URL)
law_type: Тип закона ("44" или "223")
reestr_number: Реестровый номер закупки (для точечного запроса)
document_type: Тип документа (notification, contract, etc.)
progress_callback: Callback для отчёта о прогрессе (percent, message)
Returns:
Список закупок
Raises:
ZakupkiClientError: При ошибке получения данных
"""
logger.info(
"Fetching procurements (region=%s, year=%s, month=%s, law=%s-FZ)",
region_code,
year,
month,
law_type,
)
if progress_callback:
progress_callback(0, "Инициализация...")
try:
# Если передан прямой HTTP URL - скачиваем через HTTP
if file_url and file_url.startswith("http"):
return self._download_and_parse_http(file_url, progress_callback)
# Если есть токен - используем SOAP API
if self.token:
return self._fetch_via_soap(
region_code=region_code,
year=year,
month=month,
day=None,
law_type=law_type,
reestr_number=reestr_number,
document_type=document_type,
progress_callback=progress_callback,
)
# Без токена - пробуем HTTP fallback (может не работать)
logger.warning("No token provided, trying HTTP fallback")
return self._fetch_via_http(
region_code=region_code,
year=year,
month=month,
law_type=law_type,
progress_callback=progress_callback,
)
except HTTPClientError:
raise
except Exception as e:
logger.error("Error fetching procurements: %s", e)
raise ZakupkiClientError(f"Failed to fetch procurements: {e}") from e
def _fetch_via_soap( # noqa: C901
self,
*,
region_code: str | None = None,
year: int | None = None,
month: int | None = None,
day: int | None = None,
law_type: str = "44",
reestr_number: str | None = None,
document_type: str = "notification",
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""Загрузка данных через SOAP API."""
if not self.token:
raise ZakupkiClientError("Token is required for SOAP API access")
if progress_callback:
progress_callback(5, "Формирование SOAP запроса...")
# Определяем метод и параметры запроса
if reestr_number:
# Запрос по реестровому номеру
soap_request = self._build_soap_request_by_reestr_number(
reestr_number=reestr_number,
law_type=law_type,
)
elif region_code:
# Запрос по региону
soap_request = self._build_soap_request_by_region(
region_code=region_code,
law_type=law_type,
document_type=document_type,
year=year,
month=month,
day=day,
)
else:
raise ZakupkiClientError("Either region_code or reestr_number is required")
if progress_callback:
progress_callback(10, "Отправка запроса к API...")
# Отправляем SOAP запрос
try:
response = self.http_client.post(
SOAP_API_URL,
data=soap_request.encode("utf-8"),
headers={
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "",
},
)
except HTTPClientError as e:
logger.error("SOAP request failed: %s", e)
raise ZakupkiClientError(f"SOAP request failed: {e}") from e
if progress_callback:
progress_callback(30, "Обработка ответа...")
# Парсим ответ и получаем URL архива
archive_url = self._parse_soap_response(response)
if not archive_url:
logger.warning("No archive URL in SOAP response")
if progress_callback:
progress_callback(100, "Данные не найдены")
return []
if progress_callback:
progress_callback(40, "Скачивание архива...")
# Скачиваем архив (с токеном в заголовке!)
try:
archive_content = self.http_client.download_file(
archive_url,
headers={"individualPerson_token": self.token},
)
except HTTPClientError as e:
logger.error("Failed to download archive: %s", e)
raise ZakupkiClientError(f"Failed to download archive: {e}") from e
if progress_callback:
progress_callback(70, "Парсинг данных...")
# Парсим архив
procurements = self._parse_archive_content(archive_content, archive_url)
if progress_callback:
progress_callback(95, f"Загружено {len(procurements)} закупок")
logger.info("Total fetched %d procurements via SOAP", len(procurements))
return procurements
def _build_soap_request_by_reestr_number(
self,
reestr_number: str,
law_type: str = "44",
) -> str:
"""Построить SOAP запрос по реестровому номеру."""
request_id = str(uuid.uuid4())
created_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
subsystem_type = SUBSYSTEM_TYPES.get(law_type, "PRIZ")
return f"""<?xml version="1.0" encoding="UTF-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:ws="{SOAP_NAMESPACE}">
<soapenv:Header>
<individualPerson_token>{self.token}</individualPerson_token>
</soapenv:Header>
<soapenv:Body>
<ws:getDocsByReestrNumberRequest>
<index>
<id>{request_id}</id>
<createDateTime>{created_time}</createDateTime>
<mode>PROD</mode>
</index>
<selectionParams>
<subsystemType>{subsystem_type}</subsystemType>
<reestrNumber>{reestr_number}</reestrNumber>
</selectionParams>
</ws:getDocsByReestrNumberRequest>
</soapenv:Body>
</soapenv:Envelope>"""
def _build_soap_request_by_region(
self,
region_code: str,
law_type: str = "44",
document_type: str = "notification",
year: int | None = None,
month: int | None = None,
day: int | None = None,
) -> str:
"""Построить SOAP запрос по региону."""
request_id = str(uuid.uuid4())
created_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
subsystem_type = SUBSYSTEM_TYPES.get(law_type, "PRIZ")
doc_type = DOCUMENT_TYPES_44.get(document_type, "epNotificationEF2020")
# Формируем дату для запроса
if year and month and day:
date_str = f"{year:04d}-{month:02d}-{day:02d}"
period_xml = f"<exactDate>{date_str}</exactDate>"
elif year and month:
# Берём последний день месяца как точную дату
# (API не поддерживает диапазоны напрямую)
date_str = f"{year:04d}-{month:02d}-01"
period_xml = f"<exactDate>{date_str}</exactDate>"
elif year:
date_str = f"{year:04d}-01-01"
period_xml = f"<exactDate>{date_str}</exactDate>"
else:
# Сегодняшняя дата
date_str = datetime.now().strftime("%Y-%m-%d")
period_xml = f"<exactDate>{date_str}</exactDate>"
# ВАЖНО: порядок тегов критичен для SOAP!
return f"""<?xml version="1.0" encoding="UTF-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:ws="{SOAP_NAMESPACE}">
<soapenv:Header>
<individualPerson_token>{self.token}</individualPerson_token>
</soapenv:Header>
<soapenv:Body>
<ws:getDocsByOrgRegionRequest>
<index>
<id>{request_id}</id>
<createDateTime>{created_time}</createDateTime>
<mode>PROD</mode>
</index>
<selectionParams>
<orgRegion>{region_code}</orgRegion>
<subsystemType>{subsystem_type}</subsystemType>
<documentType44>{doc_type}</documentType44>
<periodInfo>
{period_xml}
</periodInfo>
</selectionParams>
</ws:getDocsByOrgRegionRequest>
</soapenv:Body>
</soapenv:Envelope>"""
def _parse_soap_response(self, response_content: bytes) -> str | None:
"""Извлечь URL архива из SOAP ответа."""
try:
xml_str = response_content.decode("utf-8")
root = ET.fromstring(xml_str) # noqa: S314
# Ищем archiveUrl в ответе
# Структура: soap:Envelope/soap:Body/ns2:*Response/dataInfo/archiveUrl
for elem in root.iter():
if elem.tag.endswith("archiveUrl") and elem.text:
logger.info("Found archive URL: %s", elem.text)
return elem.text.strip()
# Проверяем на ошибки
for elem in root.iter():
if "fault" in elem.tag.lower() or "error" in elem.tag.lower():
error_text = elem.text or ET.tostring(elem, encoding="unicode")
logger.error("SOAP error: %s", error_text)
raise ZakupkiClientError(f"SOAP error: {error_text}")
logger.warning("No archiveUrl found in SOAP response")
return None
except ET.ParseError as e:
logger.error("Failed to parse SOAP response: %s", e)
raise ZakupkiClientError(f"Invalid SOAP response: {e}") from e
def _fetch_via_http(
self,
*,
region_code: str | None = None,
year: int | None = None,
month: int | None = None,
law_type: str = "44",
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""Загрузка данных через HTTP (fallback, может не работать)."""
plans = self._discover_data_files(
region_code=region_code,
year=year,
month=month,
law_type=law_type,
)
if not plans:
logger.warning(
"No data files found for region=%s, year=%s", region_code, year
)
return []
if progress_callback:
progress_callback(10, f"Найдено {len(plans)} файлов данных")
all_procurements = []
for i, plan in enumerate(plans):
if progress_callback:
progress = 10 + (i * 80) // len(plans)
progress_callback(progress, f"Загрузка {plan.file_name}...")
procurements = self._download_and_parse_http(plan.file_url, None)
all_procurements.extend(procurements)
logger.info(
"Parsed %d procurements from %s", len(procurements), plan.file_name
)
if progress_callback:
progress_callback(95, f"Загружено {len(all_procurements)} закупок")
logger.info("Total fetched %d procurements via HTTP", len(all_procurements))
return all_procurements
def _discover_data_files(
self,
*,
region_code: str | None = None,
year: int | None = None,
month: int | None = None,
law_type: str = "44",
) -> list[ProcurementPlan]:
"""Найти доступные файлы данных для указанного периода (HTTP fallback)."""
plans = []
if not region_code or not year:
return plans
fz_suffix = f"fz{law_type}"
if month:
file_name = f"notifications_{region_code}_{year}{month:02d}_{fz_suffix}.zip"
file_url = (
f"https://{self.host}/opendata/download/"
f"notifications/{region_code}/{year}/{month:02d}/{fz_suffix}.zip"
)
else:
file_name = f"notifications_{region_code}_{year}_{fz_suffix}.zip"
file_url = (
f"https://{self.host}/opendata/download/"
f"notifications/{region_code}/{year}/{fz_suffix}.zip"
)
plans.append(
ProcurementPlan(
region_code=region_code,
year=year,
month=month,
file_url=file_url,
file_name=file_name,
file_format="zip",
)
)
logger.info(
"Discovered %d data files for region=%s, year=%s, month=%s",
len(plans),
region_code,
year,
month,
)
return plans
def _download_and_parse_http(
self,
file_url: str,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""Скачать файл через HTTP и распарсить его содержимое."""
logger.info("Downloading: %s", file_url)
if progress_callback:
progress_callback(20, f"Скачивание {file_url}...")
try:
content = self.http_client.download_file(file_url)
except HTTPClientError as e:
logger.warning("Failed to download %s: %s", file_url, e)
return []
logger.info("Downloaded %d bytes", len(content))
return self._parse_archive_content(content, file_url)
def _parse_archive_content(
self,
content: bytes,
source_name: str,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""Определить тип файла и распарсить содержимое."""
# Определяем тип файла по содержимому
is_zip = content[:4] == b"PK\x03\x04"
is_xml = content[:5] == b"<?xml" or (
content[:100].strip().startswith(b"<")
and b"<html" not in content[:100].lower()
)
if is_zip:
return self._parse_zip_archive(content, progress_callback)
elif is_xml:
return self._parse_xml_content(content, progress_callback)
else:
try:
return self._parse_xml_content(content, progress_callback)
except Exception as e:
raise ZakupkiClientError(
f"Unknown file format for {source_name}",
url=source_name,
) from e
def _parse_zip_archive(
self,
content: bytes,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""Распаковать ZIP архив и распарсить XML файлы внутри."""
procurements = []
with zipfile.ZipFile(io.BytesIO(content)) as zf:
xml_files = [
name for name in zf.namelist() if name.lower().endswith(".xml")
]
if not xml_files:
logger.warning("No XML files found in ZIP archive")
return []
logger.info("Found %d XML files in archive", len(xml_files))
for i, xml_name in enumerate(xml_files):
if progress_callback:
progress = 30 + (i * 60) // len(xml_files)
progress_callback(progress, f"Парсинг {xml_name}...")
xml_content = zf.read(xml_name)
file_procurements = self._parse_xml_content(xml_content, None)
procurements.extend(file_procurements)
return procurements
def _parse_xml_content( # noqa: C901
self,
content: bytes,
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""Распарсить XML содержимое файла закупок."""
procurements = []
try:
# Пробуем разные кодировки
for encoding in ["utf-8", "windows-1251", "cp1251"]:
try:
xml_str = content.decode(encoding)
break
except UnicodeDecodeError:
continue
else:
xml_str = content.decode("utf-8", errors="replace")
# Очистка невалидных XML символов
xml_str = self._sanitize_xml(xml_str)
root = ET.fromstring(xml_str) # noqa: S314
# Определяем namespace
ns = {}
root_tag = root.tag
if root_tag.startswith("{"):
ns_uri = root_tag[1 : root_tag.index("}")]
ns["ns"] = ns_uri
logger.debug("Detected XML namespace: %s", ns_uri)
# Ищем записи о закупках
procurement_tags = [
".//ns:notification" if ns else None,
".//ns:purchaseNotice" if ns else None,
".//ns:fcsNotification" if ns else None,
".//notification",
".//purchaseNotice",
".//fcsNotification",
".//notificationOK",
".//notificationEF",
".//notificationZK",
".//notificationEP",
".//record",
".//item",
]
records = []
for tag in procurement_tags:
if tag is None:
continue
try:
if ns and tag.startswith(".//ns:"):
found = root.findall(tag, ns)
else:
found = root.findall(tag)
if found:
records = found
logger.info("Found %d records with tag %s", len(found), tag)
break
except Exception as e:
logger.debug("Tag %s search failed: %s", tag, e)
continue
if not records:
records = list(root)
logger.debug("Using %d root children as records", len(records))
for record in records:
procurement = self._parse_xml_record(record, ns.get("ns"))
if procurement:
procurements.append(procurement)
except ET.ParseError as e:
logger.error("XML parse error: %s", e)
raise ZakupkiClientError(f"Failed to parse XML: {e}") from e
return procurements
def _sanitize_xml(self, xml_str: str) -> str:
"""Очистить XML строку от невалидных символов."""
# Удаляем недопустимые XML символы
illegal_xml_chars_re = re.compile(
r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x84\x86-\x9f]"
)
xml_str = illegal_xml_chars_re.sub("", xml_str)
# Заменяем неэкранированные амперсанды
xml_str = re.sub(
r"&(?!(?:amp|lt|gt|apos|quot|#\d+|#x[0-9a-fA-F]+);)", "&amp;", xml_str
)
return xml_str
def _parse_xml_record( # noqa: C901
self, element: ET.Element, ns_uri: str | None = None
) -> Procurement | None:
"""Преобразовать XML элемент в объект Procurement."""
try:
def find_child(tag_name: str) -> ET.Element | None:
"""Найти дочерний элемент с учётом namespace."""
if ns_uri:
child = element.find(f"{{{ns_uri}}}{tag_name}")
if child is not None:
return child
return element.find(tag_name)
def get_text(tag_names: list[str]) -> str:
"""Получить текст элемента."""
for name in tag_names:
if name in element.attrib:
return element.attrib[name].strip()
for name in tag_names:
child = find_child(name)
if child is not None:
if child.text:
return child.text.strip()
for sub in child:
if sub.text:
return sub.text.strip()
return ""
def get_nested_text(parent_tags: list[str], child_tags: list[str]) -> str:
"""Получить текст из вложенного элемента."""
for parent_tag in parent_tags:
parent = find_child(parent_tag)
if parent is not None:
for child_tag in child_tags:
if ns_uri:
child = parent.find(f"{{{ns_uri}}}{child_tag}")
else:
child = parent.find(child_tag)
if child is not None and child.text:
return child.text.strip()
return ""
purchase_number = get_text(
[
"purchaseNumber",
"regNum",
"registrationNumber",
"notificationNumber",
"number",
]
)
purchase_name = get_text(
[
"purchaseObjectInfo",
"name",
"objectInfo",
"subject",
"purchaseName",
]
)
customer_inn = get_nested_text(
["customer", "organizationInfo", "organization", "responsibleOrg"],
["INN", "inn"],
)
customer_kpp = get_nested_text(
["customer", "organizationInfo", "organization"],
["KPP", "kpp"],
)
customer_ogrn = get_nested_text(
["customer", "organizationInfo", "organization"],
["OGRN", "ogrn"],
)
customer_name = get_nested_text(
["customer", "organizationInfo", "organization", "responsibleOrg"],
["fullName", "shortName", "name", "organizationName"],
)
max_price = get_nested_text(
["lot", "lotData", "contractConditions"],
["maxPrice", "maxContractPrice", "initialSum", "sum"],
)
if not max_price:
max_price = get_text(["maxPrice", "initialSum"])
currency_code = get_nested_text(
["lot", "lotData", "currency"],
["code", "currencyCode"],
)
if not currency_code:
currency_code = "RUB"
placement_method = get_nested_text(
["placingWay", "purchaseMethod"],
["name", "methodName", "code"],
)
if not placement_method:
placement_method = get_text(["placingWay", "purchaseMethod", "epName"])
publish_date = get_text(
["publishDate", "docPublishDate", "createDate", "publishDTInEIS"]
)
end_date = get_text(
["endDate", "submissionCloseDate", "applicationEndDate", "endDT"]
)
status = get_text(
["state", "status", "notificationStatus", "currentStatus"]
)
law_type = ""
href_val = get_text(["href", "url", "link"])
if "44" in element.tag or "fcs" in element.tag.lower():
law_type = "44-FZ"
elif "223" in element.tag:
law_type = "223-FZ"
purchase_object_info = get_text(
["purchaseObjectInfo", "objectInfo", "description"]
)
procurement = Procurement(
purchase_number=purchase_number,
purchase_name=purchase_name,
customer_inn=customer_inn,
customer_kpp=customer_kpp,
customer_ogrn=customer_ogrn,
customer_name=customer_name,
max_price=max_price,
currency_code=currency_code,
placement_method=placement_method,
publish_date=publish_date,
end_date=end_date,
status=status,
law_type=law_type,
purchase_object_info=purchase_object_info,
href=href_val,
)
if not procurement.purchase_number and not procurement.customer_inn:
logger.debug(
"Empty procurement from element %s, attribs: %s",
element.tag,
list(element.attrib.keys())[:5],
)
return None
return procurement
except Exception as e:
logger.warning("Failed to parse XML record: %s", e)
return None
def fetch_procurement_plans(
self, region_code: str, year: int
) -> list[ProcurementPlan]:
"""Получить список доступных файлов закупок за год."""
return self._discover_data_files(region_code=region_code, year=year)
def fetch_by_reestr_number(
self,
reestr_number: str,
law_type: str = "44",
progress_callback: Callable[[int, str], None] | None = None,
) -> list[Procurement]:
"""
Получить данные по реестровому номеру закупки.
Args:
reestr_number: Реестровый номер (например, "0888200000224000038")
law_type: Тип закона ("44" или "223")
progress_callback: Callback для отчёта о прогрессе
Returns:
Список закупок (обычно одна)
"""
return self.fetch_procurements(
reestr_number=reestr_number,
law_type=law_type,
progress_callback=progress_callback,
)
def close(self) -> None:
"""Закрыть клиент и освободить ресурсы."""
if self._http_client is not None:
self._http_client.close()
self._http_client = None
def __enter__(self) -> "ZakupkiClient":
"""Поддержка context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Закрытие при выходе из context manager."""
self.close()

View File

@@ -0,0 +1,94 @@
"""
Dataclass схемы для данных zakupki.gov.ru.
Эти классы представляют данные о государственных закупках, возвращаемые клиентом.
Они не зависят от Django ORM и используются как DTO.
"""
from dataclasses import dataclass
@dataclass(frozen=True)
class Procurement:
"""
Государственная закупка из ЕИС zakupki.gov.ru.
Источник: Единая информационная система в сфере закупок.
Содержит данные о закупках по 44-ФЗ и 223-ФЗ.
"""
purchase_number: str
"""Реестровый номер закупки."""
purchase_name: str
"""Наименование закупки."""
customer_inn: str
"""ИНН заказчика."""
customer_kpp: str
"""КПП заказчика."""
customer_ogrn: str
"""ОГРН заказчика."""
customer_name: str
"""Наименование заказчика."""
max_price: str
"""Начальная (максимальная) цена контракта."""
currency_code: str
"""Код валюты (RUB, USD и т.д.)."""
placement_method: str
"""Способ определения поставщика."""
publish_date: str
"""Дата публикации извещения."""
end_date: str
"""Дата окончания подачи заявок."""
status: str
"""Статус закупки."""
law_type: str
"""Тип закона (44-ФЗ, 223-ФЗ)."""
purchase_object_info: str = ""
"""Информация об объекте закупки."""
href: str = ""
"""Ссылка на страницу закупки."""
@dataclass(frozen=True)
class ProcurementPlan:
"""
План загрузки закупок на определённый период.
Содержит метаданные о файле с данными.
"""
region_code: str
"""Код региона."""
year: int
"""Год данных."""
month: int | None
"""Месяц (если данные помесячные)."""
file_url: str
"""URL файла с данными."""
file_name: str
"""Имя файла."""
records_count: int = 0
"""Количество записей (если известно)."""
file_format: str = "xml"
"""Формат файла (xml, csv)."""