feat(organizations): migrate source storage to polymorphic records
This commit is contained in:
962
src/organizations/source_ingestion.py
Normal file
962
src/organizations/source_ingestion.py
Normal file
@@ -0,0 +1,962 @@
|
||||
"""Direct parser ingestion into organization source storage."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections import defaultdict
|
||||
from collections.abc import Iterable, Iterator
|
||||
from dataclasses import dataclass, field
|
||||
from decimal import Decimal
|
||||
from typing import Any
|
||||
|
||||
from django.db import transaction
|
||||
from django.db.models import Count, Max, Min, Q
|
||||
from django.db.models.functions import Lower
|
||||
from django.utils import timezone
|
||||
|
||||
from organizations.models import (
|
||||
Organization,
|
||||
OrganizationSourceExtension,
|
||||
OrganizationSourceFinancialLine,
|
||||
OrganizationSourceRecord,
|
||||
)
|
||||
from organizations.name_normalization import normalize_organization_name
|
||||
from organizations.source_groups import (
|
||||
SourceGroupDescriptor,
|
||||
get_source_group_descriptor,
|
||||
)
|
||||
from organizations.source_identity import normalize_identity_fields
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SourceFinancialLineInput:
|
||||
"""Structured financial line parsed from a source record."""
|
||||
|
||||
form_code: str
|
||||
line_code: str
|
||||
line_name: str
|
||||
year: int
|
||||
period_start: int | None = None
|
||||
period_end: int | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SourceRecordInput:
|
||||
"""Normalized parser output ready for organization source storage."""
|
||||
|
||||
external_id: str
|
||||
title: str
|
||||
organization_name: str
|
||||
inn: str = ""
|
||||
kpp: str = ""
|
||||
ogrn: str = ""
|
||||
ogrip: str = ""
|
||||
record_date: str = ""
|
||||
amount: Decimal | None = None
|
||||
status: str = ""
|
||||
url: str = ""
|
||||
payload: dict[str, Any] = field(default_factory=dict)
|
||||
financial_lines: Iterable[SourceFinancialLineInput] = field(default_factory=tuple)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OrganizationSourceIngestionResult:
|
||||
"""Counters returned by direct source ingestion."""
|
||||
|
||||
scanned: int = 0
|
||||
created_organizations: int = 0
|
||||
created_extensions: int = 0
|
||||
updated_extensions: int = 0
|
||||
created_records: int = 0
|
||||
updated_records: int = 0
|
||||
created_financial_lines: int = 0
|
||||
updated_financial_lines: int = 0
|
||||
unresolved: int = 0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class _NormalizedRecordInput:
|
||||
index: int
|
||||
record: SourceRecordInput
|
||||
inn: str
|
||||
kpp: str
|
||||
ogrn: str
|
||||
ogrip: str
|
||||
organization_name: str
|
||||
|
||||
|
||||
class OrganizationSourceIngestionService:
|
||||
"""Persist parser outputs directly into organization source storage."""
|
||||
|
||||
chunk_size = 500
|
||||
|
||||
@classmethod
|
||||
def save_records(
|
||||
cls,
|
||||
*,
|
||||
source: str,
|
||||
load_batch: int | None,
|
||||
records: Iterable[SourceRecordInput],
|
||||
) -> OrganizationSourceIngestionResult:
|
||||
descriptor = get_source_group_descriptor(str(source))
|
||||
deduplicated = cls._deduplicate_records(records)
|
||||
return cls._save_records(descriptor, load_batch, deduplicated)
|
||||
|
||||
@staticmethod
|
||||
def _deduplicate_records(
|
||||
records: Iterable[SourceRecordInput],
|
||||
) -> list[SourceRecordInput]:
|
||||
by_external_id: dict[str, SourceRecordInput] = {}
|
||||
without_external_id = []
|
||||
for record in records:
|
||||
external_id = str(record.external_id or "")
|
||||
if not external_id:
|
||||
without_external_id.append(record)
|
||||
continue
|
||||
by_external_id[external_id] = record
|
||||
return [*by_external_id.values(), *without_external_id]
|
||||
|
||||
@classmethod
|
||||
def _save_records(
|
||||
cls,
|
||||
descriptor: SourceGroupDescriptor,
|
||||
load_batch: int | None,
|
||||
records: list[SourceRecordInput],
|
||||
) -> OrganizationSourceIngestionResult:
|
||||
scanned = len(records)
|
||||
if not records:
|
||||
return OrganizationSourceIngestionResult()
|
||||
|
||||
with transaction.atomic():
|
||||
normalized_records = cls._normalize_records(records)
|
||||
organizations_by_index, created_organizations = (
|
||||
cls._resolve_or_create_organizations(normalized_records)
|
||||
)
|
||||
del normalized_records
|
||||
unresolved = scanned - len(organizations_by_index)
|
||||
|
||||
extensions_by_organization_id, created_extensions, updated_extensions = (
|
||||
cls._resolve_or_create_extensions(
|
||||
descriptor=descriptor,
|
||||
load_batch=load_batch,
|
||||
organizations=organizations_by_index.values(),
|
||||
)
|
||||
)
|
||||
|
||||
touched_extension_ids: set[str] = set()
|
||||
for organization in organizations_by_index.values():
|
||||
extension = extensions_by_organization_id.get(organization.uid)
|
||||
if extension is not None:
|
||||
touched_extension_ids.add(str(extension.uid))
|
||||
|
||||
(
|
||||
created_records,
|
||||
updated_records,
|
||||
created_financial_lines,
|
||||
updated_financial_lines,
|
||||
) = cls._bulk_upsert_source_records(
|
||||
descriptor=descriptor,
|
||||
load_batch=load_batch,
|
||||
records=records,
|
||||
record_inputs_with_extensions=cls._iter_record_inputs_with_extensions(
|
||||
records=records,
|
||||
organizations_by_index=organizations_by_index,
|
||||
extensions_by_organization_id=extensions_by_organization_id,
|
||||
),
|
||||
)
|
||||
|
||||
cls._refresh_extension_counters(touched_extension_ids)
|
||||
|
||||
return OrganizationSourceIngestionResult(
|
||||
scanned=scanned,
|
||||
created_organizations=created_organizations,
|
||||
created_extensions=created_extensions,
|
||||
updated_extensions=updated_extensions,
|
||||
created_records=created_records,
|
||||
updated_records=updated_records,
|
||||
created_financial_lines=created_financial_lines,
|
||||
updated_financial_lines=updated_financial_lines,
|
||||
unresolved=unresolved,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _chunks(values: list[Any], chunk_size: int) -> Iterable[list[Any]]:
|
||||
for offset in range(0, len(values), chunk_size):
|
||||
yield values[offset : offset + chunk_size]
|
||||
|
||||
@staticmethod
|
||||
def _iter_chunks(values: Iterable[Any], chunk_size: int) -> Iterator[list[Any]]:
|
||||
chunk = []
|
||||
for value in values:
|
||||
chunk.append(value)
|
||||
if len(chunk) >= chunk_size:
|
||||
yield chunk
|
||||
chunk = []
|
||||
if chunk:
|
||||
yield chunk
|
||||
|
||||
@staticmethod
|
||||
def _iter_record_inputs_with_extensions(
|
||||
*,
|
||||
records: list[SourceRecordInput],
|
||||
organizations_by_index: dict[int, Organization],
|
||||
extensions_by_organization_id: dict[Any, OrganizationSourceExtension],
|
||||
) -> Iterator[tuple[int, SourceRecordInput, OrganizationSourceExtension]]:
|
||||
for index, organization in organizations_by_index.items():
|
||||
extension = extensions_by_organization_id.get(organization.uid)
|
||||
if extension is not None:
|
||||
yield index, records[index], extension
|
||||
|
||||
@classmethod
|
||||
def _normalize_records(
|
||||
cls,
|
||||
records: list[SourceRecordInput],
|
||||
) -> list[_NormalizedRecordInput]:
|
||||
normalized_records = []
|
||||
for index, record_input in enumerate(records):
|
||||
inn, kpp, ogrn, ogrip = normalize_identity_fields(
|
||||
inn=record_input.inn,
|
||||
kpp=record_input.kpp,
|
||||
ogrn=record_input.ogrn,
|
||||
ogrip=record_input.ogrip,
|
||||
)
|
||||
normalized_records.append(
|
||||
_NormalizedRecordInput(
|
||||
index=index,
|
||||
record=record_input,
|
||||
inn=inn,
|
||||
kpp=kpp,
|
||||
ogrn=ogrn,
|
||||
ogrip=ogrip,
|
||||
organization_name=str(record_input.organization_name or ""),
|
||||
)
|
||||
)
|
||||
return normalized_records
|
||||
|
||||
@classmethod
|
||||
def _resolve_or_create_organizations(
|
||||
cls,
|
||||
normalized_records: list[_NormalizedRecordInput],
|
||||
) -> tuple[dict[int, Organization], int]:
|
||||
organizations_by_index: dict[int, Organization] = {}
|
||||
|
||||
cls._resolve_organizations_by_inn_kpp(
|
||||
normalized_records,
|
||||
organizations_by_index,
|
||||
)
|
||||
cls._resolve_organizations_by_ogrn_or_ogrip(
|
||||
normalized_records,
|
||||
organizations_by_index,
|
||||
)
|
||||
cls._resolve_organizations_by_unique_inn(
|
||||
normalized_records,
|
||||
organizations_by_index,
|
||||
)
|
||||
cls._resolve_organizations_by_exact_name(
|
||||
normalized_records,
|
||||
organizations_by_index,
|
||||
)
|
||||
|
||||
return cls._create_missing_organizations(
|
||||
normalized_records,
|
||||
organizations_by_index,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _resolve_organizations_by_inn_kpp(
|
||||
cls,
|
||||
normalized_records: list[_NormalizedRecordInput],
|
||||
organizations_by_index: dict[int, Organization],
|
||||
) -> None:
|
||||
keys = sorted(
|
||||
{
|
||||
(record.inn, record.kpp)
|
||||
for record in normalized_records
|
||||
if record.index not in organizations_by_index
|
||||
and record.inn
|
||||
and record.kpp
|
||||
}
|
||||
)
|
||||
if not keys:
|
||||
return
|
||||
|
||||
organizations_by_key: dict[tuple[str, str], Organization] = {}
|
||||
for chunk in cls._chunks(keys, cls.chunk_size):
|
||||
query = Q()
|
||||
for inn, kpp in chunk:
|
||||
query |= Q(inn=inn, kpp=kpp)
|
||||
for organization in Organization.objects.filter(query):
|
||||
organizations_by_key[(organization.inn, organization.kpp)] = organization
|
||||
|
||||
for record in normalized_records:
|
||||
if record.index in organizations_by_index:
|
||||
continue
|
||||
organization = organizations_by_key.get((record.inn, record.kpp))
|
||||
if organization is not None:
|
||||
organizations_by_index[record.index] = organization
|
||||
|
||||
@classmethod
|
||||
def _resolve_organizations_by_ogrn_or_ogrip(
|
||||
cls,
|
||||
normalized_records: list[_NormalizedRecordInput],
|
||||
organizations_by_index: dict[int, Organization],
|
||||
) -> None:
|
||||
ogrn_values = sorted(
|
||||
{
|
||||
record.ogrn
|
||||
for record in normalized_records
|
||||
if record.index not in organizations_by_index and record.ogrn
|
||||
}
|
||||
)
|
||||
ogrip_values = sorted(
|
||||
{
|
||||
record.ogrip
|
||||
for record in normalized_records
|
||||
if record.index not in organizations_by_index and record.ogrip
|
||||
}
|
||||
)
|
||||
lookup_values = sorted({*ogrn_values, *ogrip_values})
|
||||
if not lookup_values:
|
||||
return
|
||||
|
||||
by_ogrn: dict[str, Organization] = {}
|
||||
by_ogrip: dict[str, Organization] = {}
|
||||
for chunk in cls._chunks(lookup_values, cls.chunk_size):
|
||||
for organization in Organization.objects.filter(
|
||||
Q(ogrn__in=chunk) | Q(ogrip__in=chunk)
|
||||
):
|
||||
if organization.ogrn:
|
||||
by_ogrn[organization.ogrn] = organization
|
||||
if organization.ogrip:
|
||||
by_ogrip[organization.ogrip] = organization
|
||||
|
||||
for record in normalized_records:
|
||||
if record.index in organizations_by_index:
|
||||
continue
|
||||
organization = by_ogrn.get(record.ogrn) or by_ogrip.get(record.ogrn)
|
||||
if organization is None and record.ogrip:
|
||||
organization = by_ogrip.get(record.ogrip)
|
||||
if organization is not None:
|
||||
organizations_by_index[record.index] = organization
|
||||
|
||||
@classmethod
|
||||
def _resolve_organizations_by_unique_inn(
|
||||
cls,
|
||||
normalized_records: list[_NormalizedRecordInput],
|
||||
organizations_by_index: dict[int, Organization],
|
||||
) -> None:
|
||||
inn_values = sorted(
|
||||
{
|
||||
record.inn
|
||||
for record in normalized_records
|
||||
if record.index not in organizations_by_index and record.inn
|
||||
}
|
||||
)
|
||||
if not inn_values:
|
||||
return
|
||||
|
||||
organizations_by_inn: dict[str, list[Organization]] = defaultdict(list)
|
||||
for chunk in cls._chunks(inn_values, cls.chunk_size):
|
||||
for organization in Organization.objects.filter(inn__in=chunk).order_by(
|
||||
"inn"
|
||||
):
|
||||
organizations_by_inn[organization.inn].append(organization)
|
||||
|
||||
unique_by_inn = {
|
||||
inn: organizations[0]
|
||||
for inn, organizations in organizations_by_inn.items()
|
||||
if len(organizations) == 1
|
||||
}
|
||||
for record in normalized_records:
|
||||
if record.index in organizations_by_index:
|
||||
continue
|
||||
organization = unique_by_inn.get(record.inn)
|
||||
if organization is not None:
|
||||
organizations_by_index[record.index] = organization
|
||||
|
||||
@classmethod
|
||||
def _resolve_organizations_by_exact_name(
|
||||
cls,
|
||||
normalized_records: list[_NormalizedRecordInput],
|
||||
organizations_by_index: dict[int, Organization],
|
||||
) -> None:
|
||||
names = sorted(
|
||||
{
|
||||
record.organization_name.strip()
|
||||
for record in normalized_records
|
||||
if record.index not in organizations_by_index
|
||||
and normalize_organization_name(record.organization_name)
|
||||
}
|
||||
)
|
||||
if not names:
|
||||
return
|
||||
|
||||
names_lower = [name.lower() for name in names]
|
||||
organizations_by_name: dict[str, list[Organization]] = defaultdict(list)
|
||||
for chunk in cls._chunks(names_lower, cls.chunk_size):
|
||||
for organization in Organization.objects.annotate(
|
||||
name_lower=Lower("name")
|
||||
).filter(name_lower__in=chunk):
|
||||
organizations_by_name[organization.name.lower()].append(organization)
|
||||
|
||||
unique_by_name = {
|
||||
name: organizations[0]
|
||||
for name, organizations in organizations_by_name.items()
|
||||
if len(organizations) == 1
|
||||
}
|
||||
for record in normalized_records:
|
||||
if record.index in organizations_by_index:
|
||||
continue
|
||||
organization = unique_by_name.get(record.organization_name.strip().lower())
|
||||
if organization is not None:
|
||||
organizations_by_index[record.index] = organization
|
||||
|
||||
@classmethod
|
||||
def _create_missing_organizations(
|
||||
cls,
|
||||
normalized_records: list[_NormalizedRecordInput],
|
||||
organizations_by_index: dict[int, Organization],
|
||||
) -> tuple[dict[int, Organization], int]:
|
||||
indexes_by_create_key: dict[tuple[str, str], list[int]] = defaultdict(list)
|
||||
organizations_by_create_key: dict[tuple[str, str], Organization] = {}
|
||||
|
||||
for record in normalized_records:
|
||||
if record.index in organizations_by_index:
|
||||
continue
|
||||
|
||||
name = (
|
||||
record.organization_name.strip()
|
||||
or str(record.record.title or "").strip()
|
||||
or str(record.record.external_id or "").strip()
|
||||
)
|
||||
if not name:
|
||||
continue
|
||||
|
||||
create_key = cls._organization_create_key(record, name)
|
||||
indexes_by_create_key[create_key].append(record.index)
|
||||
if create_key in organizations_by_create_key:
|
||||
continue
|
||||
|
||||
organization = Organization(
|
||||
name=name,
|
||||
inn=record.inn,
|
||||
kpp=record.kpp,
|
||||
ogrn=record.ogrn,
|
||||
ogrip=record.ogrip,
|
||||
)
|
||||
organization.identity_status = organization._resolve_identity_status()
|
||||
organization.primary_identity = organization._resolve_primary_identity()
|
||||
organizations_by_create_key[create_key] = organization
|
||||
|
||||
create_instances = list(organizations_by_create_key.values())
|
||||
if not create_instances:
|
||||
return organizations_by_index, 0
|
||||
|
||||
created_count = 0
|
||||
for chunk in cls._chunks(create_instances, cls.chunk_size):
|
||||
created_count += len(
|
||||
Organization.objects.bulk_create(chunk, ignore_conflicts=True)
|
||||
)
|
||||
|
||||
for create_key, organization in organizations_by_create_key.items():
|
||||
persisted = cls._refetch_created_organization(organization)
|
||||
if persisted is None:
|
||||
continue
|
||||
for index in indexes_by_create_key[create_key]:
|
||||
organizations_by_index[index] = persisted
|
||||
|
||||
return organizations_by_index, created_count
|
||||
|
||||
@staticmethod
|
||||
def _organization_create_key(
|
||||
record: _NormalizedRecordInput,
|
||||
name: str,
|
||||
) -> tuple[str, str]:
|
||||
if record.inn and record.kpp:
|
||||
return ("inn_kpp", f"{record.inn}:{record.kpp}")
|
||||
if record.ogrip:
|
||||
return ("ogrip", record.ogrip)
|
||||
if record.ogrn:
|
||||
return ("ogrn", record.ogrn)
|
||||
if record.inn:
|
||||
return ("inn", record.inn)
|
||||
normalized_name = normalize_organization_name(name)
|
||||
if normalized_name:
|
||||
return ("name", normalized_name)
|
||||
return ("external_id", str(record.record.external_id or name))
|
||||
|
||||
@staticmethod
|
||||
def _refetch_created_organization(
|
||||
organization: Organization,
|
||||
) -> Organization | None:
|
||||
if organization.inn and organization.kpp:
|
||||
return Organization.objects.filter(
|
||||
inn=organization.inn,
|
||||
kpp=organization.kpp,
|
||||
).first()
|
||||
if organization.ogrn:
|
||||
return Organization.objects.filter(ogrn=organization.ogrn).first()
|
||||
if organization.ogrip:
|
||||
return Organization.objects.filter(ogrip=organization.ogrip).first()
|
||||
if organization.inn:
|
||||
matches = list(Organization.objects.filter(inn=organization.inn)[:2])
|
||||
return matches[0] if len(matches) == 1 else None
|
||||
return Organization.objects.filter(name__iexact=organization.name).first()
|
||||
|
||||
@classmethod
|
||||
def _resolve_or_create_extensions(
|
||||
cls,
|
||||
*,
|
||||
descriptor: SourceGroupDescriptor,
|
||||
load_batch: int | None,
|
||||
organizations: Iterable[Organization],
|
||||
) -> tuple[dict[Any, OrganizationSourceExtension], int, int]:
|
||||
unique_organizations = {organization.uid: organization for organization in organizations}
|
||||
if not unique_organizations:
|
||||
return {}, 0, 0
|
||||
|
||||
existing_extensions = {
|
||||
extension.organization_id: extension
|
||||
for extension in descriptor.extension_model.objects.filter(
|
||||
organization_id__in=list(unique_organizations)
|
||||
)
|
||||
}
|
||||
|
||||
created_extensions = 0
|
||||
extensions_by_organization_id = dict(existing_extensions)
|
||||
for organization_id, organization in unique_organizations.items():
|
||||
if organization_id in extensions_by_organization_id:
|
||||
continue
|
||||
extensions_by_organization_id[organization_id] = (
|
||||
descriptor.extension_model.objects.create(
|
||||
organization=organization,
|
||||
source_group=descriptor.source_group,
|
||||
title=descriptor.title,
|
||||
last_load_batch=load_batch,
|
||||
)
|
||||
)
|
||||
created_extensions += 1
|
||||
|
||||
changed_extension_ids = []
|
||||
for extension in existing_extensions.values():
|
||||
changed = False
|
||||
if extension.title != descriptor.title:
|
||||
extension.title = descriptor.title
|
||||
changed = True
|
||||
if load_batch is not None and extension.last_load_batch != load_batch:
|
||||
extension.last_load_batch = load_batch
|
||||
changed = True
|
||||
if changed:
|
||||
changed_extension_ids.append(extension.uid)
|
||||
|
||||
updated_extensions = 0
|
||||
if changed_extension_ids:
|
||||
update_kwargs = {
|
||||
"title": descriptor.title,
|
||||
"updated_at": timezone.now(),
|
||||
}
|
||||
if load_batch is not None:
|
||||
update_kwargs["last_load_batch"] = load_batch
|
||||
updated_extensions = descriptor.extension_model.objects.filter(
|
||||
uid__in=changed_extension_ids
|
||||
).update(**update_kwargs)
|
||||
|
||||
return extensions_by_organization_id, created_extensions, updated_extensions
|
||||
|
||||
@classmethod
|
||||
def _bulk_upsert_source_records(
|
||||
cls,
|
||||
*,
|
||||
descriptor: SourceGroupDescriptor,
|
||||
load_batch: int | None,
|
||||
records: list[SourceRecordInput],
|
||||
record_inputs_with_extensions: Iterable[
|
||||
tuple[int, SourceRecordInput, OrganizationSourceExtension]
|
||||
],
|
||||
) -> tuple[int, int, int, int]:
|
||||
created_records = 0
|
||||
updated_records = 0
|
||||
created_financial_lines = 0
|
||||
updated_financial_lines = 0
|
||||
|
||||
for chunk in cls._iter_chunks(record_inputs_with_extensions, cls.chunk_size):
|
||||
source_records_by_index, chunk_created, chunk_updated = (
|
||||
cls._bulk_upsert_source_records_chunk(
|
||||
descriptor=descriptor,
|
||||
load_batch=load_batch,
|
||||
record_inputs_with_extensions=chunk,
|
||||
)
|
||||
)
|
||||
created_records += chunk_created
|
||||
updated_records += chunk_updated
|
||||
|
||||
line_result = cls._save_financial_lines_for_records(
|
||||
records=records,
|
||||
source_records_by_index=source_records_by_index,
|
||||
)
|
||||
created_financial_lines += line_result[0]
|
||||
updated_financial_lines += line_result[1]
|
||||
source_records_by_index.clear()
|
||||
|
||||
return (
|
||||
created_records,
|
||||
updated_records,
|
||||
created_financial_lines,
|
||||
updated_financial_lines,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _bulk_upsert_source_records_chunk(
|
||||
cls,
|
||||
*,
|
||||
descriptor: SourceGroupDescriptor,
|
||||
load_batch: int | None,
|
||||
record_inputs_with_extensions: list[
|
||||
tuple[int, SourceRecordInput, OrganizationSourceExtension]
|
||||
],
|
||||
) -> tuple[dict[int, OrganizationSourceRecord], int, int]:
|
||||
if not record_inputs_with_extensions:
|
||||
return {}, 0, 0
|
||||
|
||||
source = str(descriptor.source)
|
||||
external_ids = [
|
||||
str(record_input.external_id or "")
|
||||
for _, record_input, _ in record_inputs_with_extensions
|
||||
if str(record_input.external_id or "")
|
||||
]
|
||||
existing_by_external_id: dict[str, OrganizationSourceRecord] = {}
|
||||
for source_record in OrganizationSourceRecord.objects.filter(
|
||||
source=source,
|
||||
external_id__in=sorted(set(external_ids)),
|
||||
):
|
||||
existing_by_external_id[source_record.external_id] = source_record
|
||||
|
||||
now = timezone.now()
|
||||
create_instances: list[OrganizationSourceRecord] = []
|
||||
update_instances: list[OrganizationSourceRecord] = []
|
||||
source_records_by_index: dict[int, OrganizationSourceRecord] = {}
|
||||
update_fields = [
|
||||
"extension",
|
||||
"record_type",
|
||||
"title",
|
||||
"record_date",
|
||||
"amount",
|
||||
"status",
|
||||
"url",
|
||||
"payload",
|
||||
"load_batch",
|
||||
"legacy_model",
|
||||
"legacy_pk",
|
||||
"updated_at",
|
||||
]
|
||||
|
||||
for index, record_input, extension in record_inputs_with_extensions:
|
||||
external_id = str(record_input.external_id or "")
|
||||
defaults = {
|
||||
"extension": extension,
|
||||
"record_type": descriptor.record_type,
|
||||
"title": str(record_input.title or ""),
|
||||
"record_date": str(record_input.record_date or ""),
|
||||
"amount": record_input.amount,
|
||||
"status": str(record_input.status or ""),
|
||||
"url": str(record_input.url or ""),
|
||||
"payload": dict(record_input.payload or {}),
|
||||
"load_batch": load_batch,
|
||||
"legacy_model": "",
|
||||
"legacy_pk": "",
|
||||
"updated_at": now,
|
||||
}
|
||||
|
||||
source_record = existing_by_external_id.get(external_id)
|
||||
if source_record is None:
|
||||
source_record = OrganizationSourceRecord(
|
||||
source=source,
|
||||
external_id=external_id,
|
||||
created_at=now,
|
||||
**defaults,
|
||||
)
|
||||
create_instances.append(source_record)
|
||||
else:
|
||||
for field_name, value in defaults.items():
|
||||
setattr(source_record, field_name, value)
|
||||
update_instances.append(source_record)
|
||||
|
||||
source_records_by_index[index] = source_record
|
||||
|
||||
created_records = 0
|
||||
for chunk in cls._chunks(create_instances, cls.chunk_size):
|
||||
created_records += len(OrganizationSourceRecord.objects.bulk_create(chunk))
|
||||
|
||||
for chunk in cls._chunks(update_instances, cls.chunk_size):
|
||||
OrganizationSourceRecord.objects.bulk_update(
|
||||
chunk,
|
||||
fields=update_fields,
|
||||
batch_size=cls.chunk_size,
|
||||
)
|
||||
|
||||
return source_records_by_index, created_records, len(update_instances)
|
||||
|
||||
@classmethod
|
||||
def _save_financial_lines_for_records(
|
||||
cls,
|
||||
*,
|
||||
records: list[SourceRecordInput],
|
||||
source_records_by_index: dict[int, OrganizationSourceRecord],
|
||||
) -> tuple[int, int]:
|
||||
created = 0
|
||||
updated = 0
|
||||
for index, source_record in source_records_by_index.items():
|
||||
line_result = cls._save_financial_lines(
|
||||
source_record,
|
||||
records[index].financial_lines,
|
||||
)
|
||||
created += line_result[0]
|
||||
updated += line_result[1]
|
||||
return created, updated
|
||||
|
||||
@classmethod
|
||||
def _resolve_or_create_organization(
|
||||
cls,
|
||||
record_input: SourceRecordInput,
|
||||
) -> tuple[Organization | None, bool]:
|
||||
inn, kpp, ogrn, ogrip = normalize_identity_fields(
|
||||
inn=record_input.inn,
|
||||
kpp=record_input.kpp,
|
||||
ogrn=record_input.ogrn,
|
||||
ogrip=record_input.ogrip,
|
||||
)
|
||||
organization = cls._resolve_organization(
|
||||
inn=inn,
|
||||
kpp=kpp,
|
||||
ogrn=ogrn,
|
||||
ogrip=ogrip,
|
||||
organization_name=record_input.organization_name,
|
||||
)
|
||||
if organization is not None:
|
||||
return organization, False
|
||||
|
||||
name = (
|
||||
str(record_input.organization_name or "").strip()
|
||||
or str(record_input.title or "").strip()
|
||||
or str(record_input.external_id or "").strip()
|
||||
)
|
||||
if not name:
|
||||
return None, False
|
||||
|
||||
return (
|
||||
Organization.objects.create(
|
||||
name=name,
|
||||
inn=inn,
|
||||
kpp=kpp,
|
||||
ogrn=ogrn,
|
||||
ogrip=ogrip,
|
||||
),
|
||||
True,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _resolve_organization(
|
||||
cls,
|
||||
*,
|
||||
inn: str,
|
||||
kpp: str,
|
||||
ogrn: str,
|
||||
ogrip: str,
|
||||
organization_name: str,
|
||||
) -> Organization | None:
|
||||
for resolver in (
|
||||
cls._resolve_by_inn_kpp,
|
||||
cls._resolve_by_ogrn_or_ogrip,
|
||||
cls._resolve_by_ogrip,
|
||||
cls._resolve_by_unique_inn,
|
||||
cls._resolve_by_exact_normalized_name,
|
||||
):
|
||||
organization = resolver(
|
||||
inn=inn,
|
||||
kpp=kpp,
|
||||
ogrn=ogrn,
|
||||
ogrip=ogrip,
|
||||
organization_name=organization_name,
|
||||
)
|
||||
if organization is not None:
|
||||
return organization
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _resolve_by_inn_kpp(
|
||||
*,
|
||||
inn: str,
|
||||
kpp: str,
|
||||
**_kwargs,
|
||||
) -> Organization | None:
|
||||
if not inn or not kpp:
|
||||
return None
|
||||
return Organization.objects.filter(inn=inn, kpp=kpp).first()
|
||||
|
||||
@staticmethod
|
||||
def _resolve_by_ogrn_or_ogrip(
|
||||
*,
|
||||
ogrn: str,
|
||||
**_kwargs,
|
||||
) -> Organization | None:
|
||||
if not ogrn:
|
||||
return None
|
||||
return (
|
||||
Organization.objects.filter(ogrn=ogrn).first()
|
||||
or Organization.objects.filter(ogrip=ogrn).first()
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _resolve_by_ogrip(
|
||||
*,
|
||||
ogrip: str,
|
||||
**_kwargs,
|
||||
) -> Organization | None:
|
||||
if not ogrip:
|
||||
return None
|
||||
return Organization.objects.filter(ogrip=ogrip).first()
|
||||
|
||||
@staticmethod
|
||||
def _resolve_by_unique_inn(
|
||||
*,
|
||||
inn: str,
|
||||
**_kwargs,
|
||||
) -> Organization | None:
|
||||
if not inn:
|
||||
return None
|
||||
organizations = list(Organization.objects.filter(inn=inn)[:2])
|
||||
return organizations[0] if len(organizations) == 1 else None
|
||||
|
||||
@staticmethod
|
||||
def _resolve_by_exact_normalized_name(
|
||||
*,
|
||||
organization_name: str,
|
||||
**_kwargs,
|
||||
) -> Organization | None:
|
||||
normalized_name = normalize_organization_name(organization_name)
|
||||
if not normalized_name:
|
||||
return None
|
||||
matches = list(Organization.objects.filter(name__iexact=organization_name)[:2])
|
||||
return matches[0] if len(matches) == 1 else None
|
||||
|
||||
@staticmethod
|
||||
def _update_extension(
|
||||
extension: OrganizationSourceExtension,
|
||||
descriptor: SourceGroupDescriptor,
|
||||
load_batch: int | None,
|
||||
) -> int:
|
||||
changed = False
|
||||
if extension.title != descriptor.title:
|
||||
extension.title = descriptor.title
|
||||
changed = True
|
||||
if load_batch is not None and extension.last_load_batch != load_batch:
|
||||
extension.last_load_batch = load_batch
|
||||
changed = True
|
||||
if changed:
|
||||
extension.save(update_fields=["title", "last_load_batch", "updated_at"])
|
||||
return 1
|
||||
return 0
|
||||
|
||||
@staticmethod
|
||||
def _upsert_source_record(
|
||||
*,
|
||||
descriptor: SourceGroupDescriptor,
|
||||
extension: OrganizationSourceExtension,
|
||||
record_input: SourceRecordInput,
|
||||
load_batch: int | None,
|
||||
) -> tuple[OrganizationSourceRecord, bool]:
|
||||
defaults = {
|
||||
"extension": extension,
|
||||
"record_type": descriptor.record_type,
|
||||
"title": str(record_input.title or ""),
|
||||
"record_date": str(record_input.record_date or ""),
|
||||
"amount": record_input.amount,
|
||||
"status": str(record_input.status or ""),
|
||||
"url": str(record_input.url or ""),
|
||||
"payload": dict(record_input.payload or {}),
|
||||
"load_batch": load_batch,
|
||||
"legacy_model": "",
|
||||
"legacy_pk": "",
|
||||
}
|
||||
external_id = str(record_input.external_id or "")
|
||||
if external_id:
|
||||
return OrganizationSourceRecord.objects.update_or_create(
|
||||
source=str(descriptor.source),
|
||||
external_id=external_id,
|
||||
defaults=defaults,
|
||||
)
|
||||
return (
|
||||
OrganizationSourceRecord.objects.create(
|
||||
source=str(descriptor.source),
|
||||
external_id="",
|
||||
**defaults,
|
||||
),
|
||||
True,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _save_financial_lines(
|
||||
source_record: OrganizationSourceRecord,
|
||||
financial_lines: Iterable[SourceFinancialLineInput],
|
||||
) -> tuple[int, int]:
|
||||
created = 0
|
||||
updated = 0
|
||||
for line in financial_lines:
|
||||
_, was_created = OrganizationSourceFinancialLine.objects.update_or_create(
|
||||
source_record=source_record,
|
||||
form_code=str(line.form_code),
|
||||
line_code=str(line.line_code),
|
||||
year=line.year,
|
||||
defaults={
|
||||
"line_name": str(line.line_name),
|
||||
"period_start": line.period_start,
|
||||
"period_end": line.period_end,
|
||||
},
|
||||
)
|
||||
if was_created:
|
||||
created += 1
|
||||
else:
|
||||
updated += 1
|
||||
return created, updated
|
||||
|
||||
@staticmethod
|
||||
def _refresh_extension_counters(extension_ids: set[str]) -> None:
|
||||
if not extension_ids:
|
||||
return
|
||||
|
||||
aggregates = {
|
||||
row["extension_id"]: row
|
||||
for row in OrganizationSourceRecord.objects.filter(
|
||||
extension_id__in=extension_ids
|
||||
)
|
||||
.values("extension_id")
|
||||
.annotate(
|
||||
records_count=Count("uid"),
|
||||
first_seen_at=Min("created_at"),
|
||||
last_seen_at=Max("created_at"),
|
||||
last_load_batch=Max("load_batch"),
|
||||
)
|
||||
}
|
||||
|
||||
now = timezone.now()
|
||||
extensions = list(
|
||||
OrganizationSourceExtension.objects.filter(uid__in=extension_ids)
|
||||
)
|
||||
for extension in extensions:
|
||||
aggregate = aggregates.get(extension.uid, {})
|
||||
extension.records_count = aggregate.get("records_count") or 0
|
||||
extension.first_seen_at = aggregate.get("first_seen_at")
|
||||
extension.last_seen_at = aggregate.get("last_seen_at")
|
||||
extension.last_load_batch = aggregate.get("last_load_batch")
|
||||
extension.updated_at = now
|
||||
|
||||
OrganizationSourceExtension.objects.bulk_update(
|
||||
extensions,
|
||||
fields=[
|
||||
"records_count",
|
||||
"first_seen_at",
|
||||
"last_seen_at",
|
||||
"last_load_batch",
|
||||
"updated_at",
|
||||
],
|
||||
batch_size=1000,
|
||||
)
|
||||
Reference in New Issue
Block a user