perf(organizations): cache and instrument API responses
This commit is contained in:
@@ -4,15 +4,20 @@ Core middleware components.
|
||||
Provides Request ID tracking and other cross-cutting concerns.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from django.db import connection
|
||||
from django.middleware.csrf import CsrfViewMiddleware
|
||||
from django.template.response import ContentNotRenderedError
|
||||
from django.urls import Resolver404, resolve
|
||||
from django.utils.deprecation import MiddlewareMixin
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
organization_metrics_logger = logging.getLogger("organizations.api.metrics")
|
||||
|
||||
# Thread-local storage for request context
|
||||
_request_context = threading.local()
|
||||
@@ -170,3 +175,102 @@ class RequestLoggingMiddleware(MiddlewareMixin):
|
||||
},
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
class OrganizationApiMetricsMiddleware:
|
||||
"""Emit focused request metrics for API endpoints that serve organizations."""
|
||||
|
||||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
if not self._should_log(request):
|
||||
return self.get_response(request)
|
||||
|
||||
db_query_count = 0
|
||||
db_duration_seconds = 0.0
|
||||
started_at = time.perf_counter()
|
||||
response = None
|
||||
error = None
|
||||
|
||||
def execute_with_metrics(execute, sql, params, many, context):
|
||||
nonlocal db_query_count, db_duration_seconds
|
||||
|
||||
db_query_count += 1
|
||||
query_started_at = time.perf_counter()
|
||||
try:
|
||||
return execute(sql, params, many, context)
|
||||
finally:
|
||||
db_duration_seconds += time.perf_counter() - query_started_at
|
||||
|
||||
try:
|
||||
with connection.execute_wrapper(execute_with_metrics):
|
||||
response = self.get_response(request)
|
||||
return response
|
||||
except Exception as exc:
|
||||
error = exc.__class__.__name__
|
||||
raise
|
||||
finally:
|
||||
duration_seconds = time.perf_counter() - started_at
|
||||
self._log_metrics(
|
||||
request=request,
|
||||
response=response,
|
||||
error=error,
|
||||
duration_seconds=duration_seconds,
|
||||
db_query_count=db_query_count,
|
||||
db_duration_seconds=db_duration_seconds,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _should_log(request) -> bool:
|
||||
path = getattr(request, "path_info", "") or getattr(request, "path", "")
|
||||
if not path.startswith("/api/"):
|
||||
return False
|
||||
return "organizations" in path.strip("/").split("/")
|
||||
|
||||
@classmethod
|
||||
def _log_metrics(
|
||||
cls,
|
||||
*,
|
||||
request,
|
||||
response,
|
||||
error: str | None,
|
||||
duration_seconds: float,
|
||||
db_query_count: int,
|
||||
db_duration_seconds: float,
|
||||
) -> None:
|
||||
metrics = {
|
||||
"request_id": getattr(request, "request_id", None),
|
||||
"method": request.method,
|
||||
"path": getattr(request, "path_info", "") or request.path,
|
||||
"status_code": getattr(response, "status_code", 500),
|
||||
"duration_ms": round(duration_seconds * 1000, 2),
|
||||
"db_query_count": db_query_count,
|
||||
"db_duration_ms": round(db_duration_seconds * 1000, 2),
|
||||
"response_size_bytes": cls._response_size(response),
|
||||
"cache": response.get("X-Cache") if response is not None else None,
|
||||
"user_id": cls._user_id(request),
|
||||
"query_keys": sorted(request.GET.keys()),
|
||||
"error": error,
|
||||
}
|
||||
organization_metrics_logger.info(
|
||||
"organization_api_metrics %s",
|
||||
json.dumps(metrics, ensure_ascii=False, sort_keys=True),
|
||||
extra={"organization_api_metrics": metrics},
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _response_size(response) -> int | None:
|
||||
if response is None or getattr(response, "streaming", False):
|
||||
return None
|
||||
try:
|
||||
return len(response.content)
|
||||
except (AttributeError, ContentNotRenderedError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _user_id(request) -> int | None:
|
||||
user = getattr(request, "user", None)
|
||||
if user is None or not getattr(user, "is_authenticated", False):
|
||||
return None
|
||||
return getattr(user, "id", None)
|
||||
|
||||
@@ -7,3 +7,6 @@ class OrganizationsConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "organizations"
|
||||
verbose_name = "Организации"
|
||||
|
||||
def ready(self) -> None:
|
||||
import organizations.signals # noqa: F401
|
||||
|
||||
59
src/organizations/cache.py
Normal file
59
src/organizations/cache.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""Cache helpers for organizations API responses."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
from django.core.cache import cache
|
||||
|
||||
ORGANIZATION_API_CACHE_PREFIX = "api:v2:organizations"
|
||||
ORGANIZATION_API_CACHE_VERSION_KEY = f"{ORGANIZATION_API_CACHE_PREFIX}:version"
|
||||
DEFAULT_ORGANIZATION_API_CACHE_VERSION = 1
|
||||
DEFAULT_ORGANIZATION_API_CACHE_TIMEOUT_SECONDS = 24 * 60 * 60
|
||||
|
||||
|
||||
def get_organization_api_cache_version() -> int:
|
||||
"""Return the current organizations API cache version."""
|
||||
version = cache.get(ORGANIZATION_API_CACHE_VERSION_KEY)
|
||||
if version is None:
|
||||
cache.add(
|
||||
ORGANIZATION_API_CACHE_VERSION_KEY,
|
||||
DEFAULT_ORGANIZATION_API_CACHE_VERSION,
|
||||
timeout=None,
|
||||
)
|
||||
version = cache.get(
|
||||
ORGANIZATION_API_CACHE_VERSION_KEY,
|
||||
DEFAULT_ORGANIZATION_API_CACHE_VERSION,
|
||||
)
|
||||
|
||||
try:
|
||||
return int(version)
|
||||
except (TypeError, ValueError):
|
||||
cache.set(
|
||||
ORGANIZATION_API_CACHE_VERSION_KEY,
|
||||
DEFAULT_ORGANIZATION_API_CACHE_VERSION,
|
||||
timeout=None,
|
||||
)
|
||||
return DEFAULT_ORGANIZATION_API_CACHE_VERSION
|
||||
|
||||
|
||||
def invalidate_organization_api_cache() -> int:
|
||||
"""
|
||||
Invalidate organizations API responses by moving them to a new key version.
|
||||
|
||||
Versioning avoids broad cache scans and works with cache backends that do not
|
||||
support pattern deletion.
|
||||
"""
|
||||
new_version = _new_cache_version()
|
||||
if cache.add(ORGANIZATION_API_CACHE_VERSION_KEY, new_version, timeout=None):
|
||||
return new_version
|
||||
|
||||
try:
|
||||
return int(cache.incr(ORGANIZATION_API_CACHE_VERSION_KEY))
|
||||
except (TypeError, ValueError, NotImplementedError):
|
||||
cache.set(ORGANIZATION_API_CACHE_VERSION_KEY, new_version, timeout=None)
|
||||
return new_version
|
||||
|
||||
|
||||
def _new_cache_version() -> int:
|
||||
return time.time_ns()
|
||||
@@ -6,6 +6,7 @@ import json
|
||||
|
||||
from apps.core.management.commands.base import BaseAppCommand
|
||||
|
||||
from organizations.cache import invalidate_organization_api_cache
|
||||
from organizations.services import OrganizationDataSnapshotRefreshService
|
||||
|
||||
|
||||
@@ -36,6 +37,7 @@ class Command(BaseAppCommand):
|
||||
organization_uids=options.get("uids"),
|
||||
batch_size=options["batch_size"],
|
||||
)
|
||||
invalidate_organization_api_cache()
|
||||
rendered = json.dumps(
|
||||
{
|
||||
"processed": result.processed,
|
||||
|
||||
@@ -49,6 +49,13 @@ class OrganizationSerializer(serializers.ModelSerializer):
|
||||
enrichment = self.context.get("enrichment", {}).get(str(obj.uid))
|
||||
if enrichment is None:
|
||||
return {}
|
||||
data_sources = self.context.get("data_sources")
|
||||
if data_sources is not None:
|
||||
return {
|
||||
source: enrichment.data_presence.get(source, [])
|
||||
for source in data_sources
|
||||
if source in enrichment.data_presence
|
||||
}
|
||||
return enrichment.data_presence
|
||||
|
||||
def get_data_sources(self, obj) -> list[dict[str, int | str]]:
|
||||
|
||||
89
src/organizations/signals.py
Normal file
89
src/organizations/signals.py
Normal file
@@ -0,0 +1,89 @@
|
||||
"""Invalidation hooks for organizations API cache."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from apps.parsers.models import ParserLoadLog
|
||||
from django.db import transaction
|
||||
from django.db.models.signals import post_delete, post_save
|
||||
from django.dispatch import receiver
|
||||
from registers.models import (
|
||||
Organization as RegistryOrganization,
|
||||
)
|
||||
from registers.models import (
|
||||
Register,
|
||||
RegisterUpload,
|
||||
RegistryMembershipPeriod,
|
||||
)
|
||||
|
||||
from organizations.cache import invalidate_organization_api_cache
|
||||
from organizations.models import OrganizationDataSnapshot
|
||||
|
||||
SOURCE_UPDATE_STATUSES = {
|
||||
ParserLoadLog.Status.SUCCESS,
|
||||
ParserLoadLog.Status.SKIPPED,
|
||||
}
|
||||
|
||||
|
||||
def _invalidate_on_commit() -> None:
|
||||
transaction.on_commit(invalidate_organization_api_cache)
|
||||
|
||||
|
||||
@receiver(
|
||||
post_save, sender=ParserLoadLog, dispatch_uid="organizations_parser_load_save"
|
||||
)
|
||||
def invalidate_for_parser_load(sender, instance: ParserLoadLog, **kwargs) -> None:
|
||||
"""Invalidate when a parser source reaches a visible terminal state."""
|
||||
if instance.status in SOURCE_UPDATE_STATUSES:
|
||||
_invalidate_on_commit()
|
||||
|
||||
|
||||
@receiver(post_save, sender=Register, dispatch_uid="organizations_register_save")
|
||||
@receiver(post_delete, sender=Register, dispatch_uid="organizations_register_delete")
|
||||
@receiver(
|
||||
post_save,
|
||||
sender=RegistryOrganization,
|
||||
dispatch_uid="organizations_registry_organization_save",
|
||||
)
|
||||
@receiver(
|
||||
post_delete,
|
||||
sender=RegistryOrganization,
|
||||
dispatch_uid="organizations_registry_organization_delete",
|
||||
)
|
||||
@receiver(
|
||||
post_save,
|
||||
sender=RegistryMembershipPeriod,
|
||||
dispatch_uid="organizations_registry_membership_save",
|
||||
)
|
||||
@receiver(
|
||||
post_delete,
|
||||
sender=RegistryMembershipPeriod,
|
||||
dispatch_uid="organizations_registry_membership_delete",
|
||||
)
|
||||
@receiver(
|
||||
post_save,
|
||||
sender=OrganizationDataSnapshot,
|
||||
dispatch_uid="organizations_data_snapshot_save",
|
||||
)
|
||||
@receiver(
|
||||
post_delete,
|
||||
sender=OrganizationDataSnapshot,
|
||||
dispatch_uid="organizations_data_snapshot_delete",
|
||||
)
|
||||
def invalidate_for_registry_or_snapshot_change(sender, **kwargs) -> None:
|
||||
"""Invalidate for direct registry and snapshot writes."""
|
||||
_invalidate_on_commit()
|
||||
|
||||
|
||||
@receiver(
|
||||
post_save, sender=RegisterUpload, dispatch_uid="organizations_register_upload_save"
|
||||
)
|
||||
def invalidate_for_successful_register_upload(
|
||||
sender,
|
||||
instance: RegisterUpload,
|
||||
created: bool,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
"""Invalidate once a registry import has completed successfully."""
|
||||
if created or instance.import_status != RegisterUpload.ImportStatus.SUCCESS:
|
||||
return
|
||||
_invalidate_on_commit()
|
||||
@@ -5,8 +5,8 @@ from __future__ import annotations
|
||||
import logging
|
||||
|
||||
from celery import shared_task
|
||||
from django.core.cache import cache
|
||||
|
||||
from organizations.cache import invalidate_organization_api_cache
|
||||
from organizations.services import OrganizationDataSnapshotRefreshService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -16,7 +16,7 @@ logger = logging.getLogger(__name__)
|
||||
def refresh_all_organization_data_snapshots(batch_size: int = 100) -> dict:
|
||||
"""Refresh all organization data snapshots for API v2."""
|
||||
result = OrganizationDataSnapshotRefreshService.refresh(batch_size=batch_size)
|
||||
cache.clear()
|
||||
invalidate_organization_api_cache()
|
||||
payload = {
|
||||
"processed": result.processed,
|
||||
"created": result.created,
|
||||
@@ -39,7 +39,7 @@ def refresh_organization_data_snapshots_for_parser_batch(
|
||||
batch_id=batch_id,
|
||||
batch_size=batch_size,
|
||||
)
|
||||
cache.clear()
|
||||
invalidate_organization_api_cache()
|
||||
payload = {
|
||||
"source": source,
|
||||
"batch_id": batch_id,
|
||||
|
||||
@@ -23,11 +23,15 @@ from organizations.api_enrichment import (
|
||||
to_api_data_source,
|
||||
to_internal_data_source,
|
||||
)
|
||||
from organizations.cache import (
|
||||
DEFAULT_ORGANIZATION_API_CACHE_TIMEOUT_SECONDS,
|
||||
ORGANIZATION_API_CACHE_PREFIX,
|
||||
get_organization_api_cache_version,
|
||||
)
|
||||
from organizations.filters import OrganizationFilter
|
||||
from organizations.models import Organization
|
||||
from organizations.serializers import OrganizationSerializer
|
||||
|
||||
ORGANIZATIONS_API_CACHE_TIMEOUT_SECONDS = 300
|
||||
ORGANIZATIONS_TAG = swagger_tag("Организации", "Organizations")
|
||||
ORGANIZATION_DATA_SOURCE_KEYS = ", ".join(sorted(API_DATA_SOURCE_KEY_SET))
|
||||
|
||||
@@ -59,7 +63,9 @@ ORGANIZATION_DATA_PARAMS = [
|
||||
description=(
|
||||
"Ограничить блок data одним или несколькими источниками. "
|
||||
f"Допустимые значения: {ORGANIZATION_DATA_SOURCE_KEYS}. "
|
||||
"Можно передать несколько параметров или CSV-строку."
|
||||
"Можно передать несколько параметров или CSV-строку. "
|
||||
"На list endpoint блок data по умолчанию пустой; передайте этот "
|
||||
"параметр, чтобы вернуть данные источников."
|
||||
),
|
||||
),
|
||||
_query_parameter(
|
||||
@@ -238,17 +244,28 @@ ORGANIZATION_DETAIL_RESPONSE = openapi.Response(
|
||||
class CachedReadOnlyMixin:
|
||||
"""Cache successful GET list/retrieve responses by full request path."""
|
||||
|
||||
cache_timeout = ORGANIZATIONS_API_CACHE_TIMEOUT_SECONDS
|
||||
cache_key_prefix = "api:v2:organizations"
|
||||
cache_timeout = DEFAULT_ORGANIZATION_API_CACHE_TIMEOUT_SECONDS
|
||||
cache_key_prefix = ORGANIZATION_API_CACHE_PREFIX
|
||||
|
||||
def _cache_timeout(self) -> int:
|
||||
return getattr(
|
||||
settings,
|
||||
"ORGANIZATIONS_API_CACHE_TIMEOUT_SECONDS",
|
||||
self.cache_timeout,
|
||||
)
|
||||
|
||||
def _build_cache_key(self, request) -> str:
|
||||
user_marker = "anonymous"
|
||||
if request.user and request.user.is_authenticated:
|
||||
user_marker = "authenticated"
|
||||
|
||||
raw_key = f"{request.method}:{request.get_full_path()}:{user_marker}"
|
||||
cache_version = get_organization_api_cache_version()
|
||||
raw_key = (
|
||||
f"v{cache_version}:{request.method}:"
|
||||
f"{request.get_full_path()}:{user_marker}"
|
||||
)
|
||||
digest = hashlib.md5(raw_key.encode(), usedforsecurity=False).hexdigest()
|
||||
return f"{self.cache_key_prefix}:{digest}"
|
||||
return f"{self.cache_key_prefix}:v{cache_version}:{digest}"
|
||||
|
||||
def _cached_response(self, request, producer) -> Response:
|
||||
cache_key = self._build_cache_key(request)
|
||||
@@ -260,7 +277,7 @@ class CachedReadOnlyMixin:
|
||||
|
||||
response = producer()
|
||||
if 200 <= response.status_code < 300:
|
||||
cache.set(cache_key, response.data, timeout=self.cache_timeout)
|
||||
cache.set(cache_key, response.data, timeout=self._cache_timeout())
|
||||
response["X-Cache"] = "MISS"
|
||||
return response
|
||||
|
||||
@@ -312,7 +329,9 @@ class OrganizationViewSet(CachedReadOnlyMixin, ReadOnlyModelViewSet):
|
||||
"По умолчанию показывает только организации с активным участием "
|
||||
"в реестрах; передайте has_registry=false, чтобы снять это ограничение. "
|
||||
"Поддерживает пагинацию, поиск по наименованию и реквизитам, фильтры "
|
||||
"по реестрам и наличию данных по источникам."
|
||||
"по реестрам и наличию данных по источникам. Для list endpoint "
|
||||
"тяжелый блок data по умолчанию пустой; передайте data/data_sources, "
|
||||
"чтобы вернуть данные конкретных источников."
|
||||
),
|
||||
manual_parameters=ORGANIZATION_LIST_PARAMS,
|
||||
responses={200: ORGANIZATION_LIST_RESPONSE},
|
||||
@@ -343,7 +362,7 @@ class OrganizationViewSet(CachedReadOnlyMixin, ReadOnlyModelViewSet):
|
||||
|
||||
def _list_with_enrichment(self, request, *args: Any, **kwargs: Any) -> Response:
|
||||
queryset = self.filter_queryset(self.get_queryset())
|
||||
data_sources = self._parse_data_sources(request)
|
||||
data_sources = self._parse_data_sources(request, default=set())
|
||||
|
||||
page = self.paginate_queryset(queryset)
|
||||
if page is not None:
|
||||
@@ -419,7 +438,11 @@ class OrganizationViewSet(CachedReadOnlyMixin, ReadOnlyModelViewSet):
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _parse_data_sources(request) -> set[str] | None:
|
||||
def _parse_data_sources(
|
||||
request,
|
||||
*,
|
||||
default: set[str] | None = None,
|
||||
) -> set[str] | None:
|
||||
included = _query_param_values(request, "data", "data_sources")
|
||||
excluded = _query_param_values(request, "exclude_data", "exclude_data_sources")
|
||||
|
||||
@@ -443,7 +466,7 @@ class OrganizationViewSet(CachedReadOnlyMixin, ReadOnlyModelViewSet):
|
||||
}
|
||||
if excluded:
|
||||
return API_DATA_SOURCE_KEY_SET - excluded
|
||||
return None
|
||||
return default
|
||||
|
||||
|
||||
def _query_param_values(request, *names: str) -> set[str]:
|
||||
|
||||
@@ -171,6 +171,7 @@ MIDDLEWARE = [
|
||||
"django.middleware.common.CommonMiddleware",
|
||||
"apps.core.middleware.ApiCsrfExemptMiddleware",
|
||||
"django.contrib.auth.middleware.AuthenticationMiddleware",
|
||||
"apps.core.middleware.OrganizationApiMetricsMiddleware",
|
||||
"django.contrib.messages.middleware.MessageMiddleware",
|
||||
"django.middleware.clickjacking.XFrameOptionsMiddleware",
|
||||
]
|
||||
@@ -381,6 +382,10 @@ LOGGING = {
|
||||
"format": "{levelname} {message}",
|
||||
"style": "{",
|
||||
},
|
||||
"metrics": {
|
||||
"format": "{message}",
|
||||
"style": "{",
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"file": {
|
||||
@@ -389,6 +394,14 @@ LOGGING = {
|
||||
"filename": PROJECT_ROOT / "logs/django.log",
|
||||
"formatter": "verbose",
|
||||
},
|
||||
"organizations_api_metrics_file": {
|
||||
"level": "INFO",
|
||||
"class": "logging.handlers.RotatingFileHandler",
|
||||
"filename": PROJECT_ROOT / "logs/organizations_api_metrics.log",
|
||||
"maxBytes": 20 * 1024 * 1024,
|
||||
"backupCount": 5,
|
||||
"formatter": "metrics",
|
||||
},
|
||||
"console": {
|
||||
"level": "INFO",
|
||||
"class": "logging.StreamHandler",
|
||||
@@ -405,6 +418,11 @@ LOGGING = {
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
},
|
||||
"organizations.api.metrics": {
|
||||
"handlers": ["organizations_api_metrics_file"],
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -95,6 +95,10 @@ LOGGING = {
|
||||
"format": "{levelname} {asctime} {module} {process:d} {thread:d} {message}",
|
||||
"style": "{",
|
||||
},
|
||||
"metrics": {
|
||||
"format": "{message}",
|
||||
"style": "{",
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
"console": {
|
||||
@@ -102,6 +106,14 @@ LOGGING = {
|
||||
"class": "logging.StreamHandler",
|
||||
"formatter": "verbose",
|
||||
},
|
||||
"organizations_api_metrics_file": {
|
||||
"level": "INFO",
|
||||
"class": "logging.handlers.RotatingFileHandler",
|
||||
"filename": PROJECT_ROOT / "logs/organizations_api_metrics.log",
|
||||
"maxBytes": 20 * 1024 * 1024,
|
||||
"backupCount": 5,
|
||||
"formatter": "metrics",
|
||||
},
|
||||
},
|
||||
"root": {
|
||||
"handlers": ["console"],
|
||||
@@ -113,5 +125,10 @@ LOGGING = {
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
},
|
||||
"organizations.api.metrics": {
|
||||
"handlers": ["organizations_api_metrics_file"],
|
||||
"level": "INFO",
|
||||
"propagate": False,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user