From df89e498cc7e801567d64ee238bc22e7f7de537f Mon Sep 17 00:00:00 2001 From: Aleksandr Meshchriakov Date: Thu, 14 May 2026 17:07:58 +0200 Subject: [PATCH] fix(parsers): search registry vacancies across job boards --- src/apps/parsers/clients/vacancies.py | 23 ++-- src/apps/parsers/tasks.py | 168 ++++++++++++++++++++++++-- tests/apps/parsers/test_tasks.py | 154 +++++++++++++++++++++++ 3 files changed, 327 insertions(+), 18 deletions(-) diff --git a/src/apps/parsers/clients/vacancies.py b/src/apps/parsers/clients/vacancies.py index 1fde525..1f5f330 100644 --- a/src/apps/parsers/clients/vacancies.py +++ b/src/apps/parsers/clients/vacancies.py @@ -306,6 +306,20 @@ class VacanciesClient: ) return list(dict.fromkeys(selected)) + def iter_source_clients(self) -> list[tuple[str, VacancyProvider]]: + """Return configured vacancy providers in selected order.""" + clients = self._build_source_clients() + configured_clients: list[tuple[str, VacancyProvider]] = [] + for source in self._selected_sources(): + client = clients.get(source) + if client is None: + if self.sources and source == SUPERJOB_SOURCE: + raise VacanciesClientError("SUPERJOB_APP_ID is required") + logger.info("Vacancy source %s is skipped: not configured", source) + continue + configured_clients.append((source, client)) + return configured_clients + def fetch_vacancies( self, *, @@ -316,18 +330,11 @@ class VacanciesClient: text: str | None = None, ) -> list[GenericParserItem]: """Получить вакансии из включённых источников.""" - clients = self._build_source_clients() records: list[GenericParserItem] = [] errors: list[str] = [] attempts = 0 - for source in self._selected_sources(): - client = clients.get(source) - if client is None: - if self.sources and source == SUPERJOB_SOURCE: - raise VacanciesClientError("SUPERJOB_APP_ID is required") - logger.info("Vacancy source %s is skipped: not configured", source) - continue + for source, client in self.iter_source_clients(): if company_inn and not getattr(client, "supports_company_inn", False): logger.info( "Vacancy source %s is skipped: company_inn is not supported", diff --git a/src/apps/parsers/tasks.py b/src/apps/parsers/tasks.py index b106467..b729140 100644 --- a/src/apps/parsers/tasks.py +++ b/src/apps/parsers/tasks.py @@ -7,6 +7,7 @@ Celery задачи для приложения парсеров. import hashlib import logging +import re import shutil import time import uuid @@ -91,6 +92,30 @@ class RegistryLookupTarget: VACANCY_REGISTRY_MAX_PAGES_PER_ORGANIZATION = 100 +VACANCY_REGISTRY_TEXT_SEARCH_MAX_PAGES_PER_ORGANIZATION = 1 +VACANCY_EMPLOYER_WORD_RE = re.compile(r"[0-9A-Za-zА-Яа-яЁё]+") +VACANCY_EMPLOYER_IGNORED_WORDS = { + "ао", + "акционерное", + "государственное", + "зао", + "индивидуальный", + "ип", + "муниципальное", + "нао", + "некоммерческая", + "оао", + "общество", + "ограниченной", + "ооо", + "ответственностью", + "пао", + "предприниматель", + "публичное", + "с", + "унитарное", + "фгуп", +} def _resolve_lookup_limit( @@ -2929,27 +2954,151 @@ def _fetch_registry_target_vacancy_records( *, page_size: int, ) -> list[GenericParserItem]: - records: list[GenericParserItem] = [] - offset = 0 - for _ in range(VACANCY_REGISTRY_MAX_PAGES_PER_ORGANIZATION): - page_records = client.fetch_vacancies( - limit=page_size, - offset=offset, + iter_source_clients = getattr(client, "iter_source_clients", None) + if iter_source_clients is None: + return _fetch_registry_target_source_vacancy_records( + client, + target, + page_size=page_size, company_inn=target.inn, ) - records.extend( - _attach_registry_vacancy_target(record, target) for record in page_records + + records: list[GenericParserItem] = [] + errors: list[str] = [] + attempts = 0 + + for source, source_client in iter_source_clients(): + if getattr(source_client, "supports_company_inn", False): + kwargs = {"company_inn": target.inn} + else: + if not target.name: + logger.info( + "Vacancy source %s is skipped for registry organization %s: " + "empty organization name", + source, + target.organization_id, + ) + continue + kwargs = {"text": _vacancy_registry_text_query(target)} + + attempts += 1 + try: + source_records = _fetch_registry_target_source_vacancy_records( + source_client, + target, + page_size=page_size, + **kwargs, + ) + except Exception as exc: + logger.warning( + "Vacancy source %s failed for registry organization %s (%s): %s", + source, + target.organization_id, + target.inn, + exc, + ) + errors.append(f"{source}: {exc}") + continue + + records.extend(source_records) + + if errors and not records and attempts: + raise RuntimeError( + "All vacancy sources failed for registry organization " + f"{target.organization_id} ({target.inn}); first error: {errors[0]}" ) + return records + + +def _fetch_registry_target_source_vacancy_records( + source_client, + target: RegistryLookupTarget, + *, + page_size: int, + company_inn: str | None = None, + text: str | None = None, +) -> list[GenericParserItem]: + records: list[GenericParserItem] = [] + offset = 0 + filter_by_employer_name = company_inn is None + max_pages = ( + VACANCY_REGISTRY_TEXT_SEARCH_MAX_PAGES_PER_ORGANIZATION + if filter_by_employer_name + else VACANCY_REGISTRY_MAX_PAGES_PER_ORGANIZATION + ) + for _ in range(max_pages): + page_records = source_client.fetch_vacancies( + limit=page_size, + offset=offset, + company_inn=company_inn, + text=text, + ) + if filter_by_employer_name: + matched_records = [ + record + for record in page_records + if _vacancy_record_matches_registry_target(record, target) + ] + else: + matched_records = page_records + records.extend(matched_records) if len(page_records) < page_size: return records offset += page_size + if filter_by_employer_name: + return records + raise RuntimeError( "Vacancy registry organization page limit reached " f"for organization {target.organization_id} ({target.inn})" ) +def _vacancy_record_matches_registry_target( + record: GenericParserItem, + target: RegistryLookupTarget, +) -> bool: + target_key = _vacancy_employer_match_key(target.name) + employer_key = _vacancy_employer_match_key(_vacancy_record_employer_name(record)) + if not target_key or not employer_key: + return False + if target_key == employer_key: + return True + if min(len(target_key), len(employer_key)) < 8: + return False + return target_key in employer_key or employer_key in target_key + + +def _vacancy_registry_text_query(target: RegistryLookupTarget) -> str: + return _vacancy_employer_match_key(target.name) or target.name + + +def _vacancy_record_employer_name(record: GenericParserItem) -> str: + if record.organisation_name: + return record.organisation_name + + payload = record.payload if isinstance(record.payload, dict) else {} + for key in ("employer", "company"): + nested = payload.get(key) + if isinstance(nested, dict) and nested.get("name"): + return str(nested["name"]) + for key in ("firm_name", "company_name", "organisation_name"): + value = payload.get(key) + if value: + return str(value) + return "" + + +def _vacancy_employer_match_key(name: str) -> str: + words = [] + for match in VACANCY_EMPLOYER_WORD_RE.finditer(name.casefold().replace("ё", "е")): + word = match.group(0) + if word not in VACANCY_EMPLOYER_IGNORED_WORDS: + words.append(word) + return " ".join(words) + + def _fetch_registry_organization_vacancy_records( *, proxies: list[str] | None, @@ -2966,11 +3115,10 @@ def _fetch_registry_organization_vacancy_records( records: list[GenericParserItem] = [] errors: list[str] = [] successful_fetches = 0 - sources = vacancy_sources or ["trudvsem"] with VacanciesClient( proxies=proxies, superjob_app_id=getattr(settings, "SUPERJOB_APP_ID", ""), - sources=sources, + sources=vacancy_sources, ) as client: for target in targets: try: diff --git a/tests/apps/parsers/test_tasks.py b/tests/apps/parsers/test_tasks.py index 11f314e..bc23f6c 100644 --- a/tests/apps/parsers/test_tasks.py +++ b/tests/apps/parsers/test_tasks.py @@ -2191,6 +2191,160 @@ class ParseVacanciesTaskTestCase(TestCase): {"trudvsem:7701000102"}, ) + @override_settings(SUPERJOB_APP_ID="test-superjob-app-id") + def test_parse_trudvsem_vacancies_matches_job_boards_by_employer_name(self): + organization = OrganizationFactory( + pn_name='Общество с ограниченной ответственностью "Ромашка"', + mn_inn=7701000301, + mn_ogrn=1027700000301, + ) + RegistryMembershipPeriodFactory(organization=organization, ended_at=None) + captured_client_kwargs = {} + captured_text_queries = {} + + class _Provider: + def __init__(self, source_name, *, supports_company_inn): + self.source_name = source_name + self.supports_company_inn = supports_company_inn + + def fetch_vacancies(self, **kwargs): + if self.source_name == "trudvsem": + return [ + GenericParserItem( + source=ParserLoadLog.Source.TRUDVSEM, + external_id="trudvsem:romashka", + inn=kwargs["company_inn"], + title="Работа России", + payload={"vacancy_source": "trudvsem"}, + ) + ] + + captured_text_queries[self.source_name] = kwargs["text"] + return [ + GenericParserItem( + source=ParserLoadLog.Source.TRUDVSEM, + external_id=f"{self.source_name}:romashka", + organisation_name='ООО "Ромашка"', + title=f"{self.source_name} matching vacancy", + payload={"vacancy_source": self.source_name}, + ), + GenericParserItem( + source=ParserLoadLog.Source.TRUDVSEM, + external_id=f"{self.source_name}:other", + organisation_name='ООО "Лютик"', + title=f"{self.source_name} unrelated vacancy", + payload={"vacancy_source": self.source_name}, + ), + ] + + class _VacanciesClient: + def __init__(self, **kwargs): + captured_client_kwargs.update(kwargs) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return None + + def fetch_vacancies(self, **kwargs): + return [ + GenericParserItem( + source=ParserLoadLog.Source.TRUDVSEM, + external_id="trudvsem:romashka", + inn=kwargs["company_inn"], + title="Работа России", + payload={"vacancy_source": "trudvsem"}, + ) + ] + + def iter_source_clients(self): + return [ + ("trudvsem", _Provider("trudvsem", supports_company_inn=True)), + ("hh", _Provider("hh", supports_company_inn=False)), + ("superjob", _Provider("superjob", supports_company_inn=False)), + ] + + original_client = parser_tasks.VacanciesClient + parser_tasks.VacanciesClient = _VacanciesClient + try: + result = parse_trudvsem_vacancies(limit=50, proxies=[]) + finally: + parser_tasks.VacanciesClient = original_client + + self.assertEqual(result["status"], "success") + self.assertIsNone(captured_client_kwargs["sources"]) + self.assertEqual( + captured_text_queries, + { + "hh": "ромашка", + "superjob": "ромашка", + }, + ) + self.assertEqual(result["saved"], 3) + self.assertEqual( + set( + GenericParserRecord.objects.values_list( + "external_id", + "source", + "registry_organization_id", + ) + ), + { + ("trudvsem:romashka", "trudvsem", organization.id), + ("hh:romashka", "hh", organization.id), + ("superjob:romashka", "superjob", organization.id), + }, + ) + + def test_registry_job_board_matching_fetches_only_first_text_search_page(self): + organization = OrganizationFactory( + pn_name='Общество с ограниченной ответственностью "Ромашка"', + mn_inn=7701000302, + mn_ogrn=1027700000302, + ) + RegistryMembershipPeriodFactory(organization=organization, ended_at=None) + captured_offsets = [] + + class _Provider: + supports_company_inn = False + + def fetch_vacancies(self, **kwargs): + captured_offsets.append(kwargs["offset"]) + return [ + GenericParserItem( + source=ParserLoadLog.Source.TRUDVSEM, + external_id=f"hh:romashka:{kwargs['offset']}", + organisation_name='ООО "Ромашка"', + title="HeadHunter", + payload={"vacancy_source": "hh"}, + ) + ] + + class _VacanciesClient: + def __init__(self, **kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return None + + def iter_source_clients(self): + return [("hh", _Provider())] + + original_client = parser_tasks.VacanciesClient + parser_tasks.VacanciesClient = _VacanciesClient + try: + result = parse_trudvsem_vacancies(limit=1, proxies=[]) + finally: + parser_tasks.VacanciesClient = original_client + + self.assertEqual(result["status"], "success") + self.assertEqual(result["saved"], 1) + self.assertEqual(captured_offsets, [0]) + @override_settings(SUPERJOB_APP_ID="test-superjob-app-id") def test_parse_trudvsem_vacancies_uses_combined_vacancies_client(self): captured_kwargs = {}