Source code for nextorm.debug
"""Debug and diagnostic utilities for NextORM.
Provides SQL debug logging and per-instance / global query statistics.
Example::
from nextorm import set_sql_debug, sql_debugging
set_sql_debug(True)
results = db.select(User).fetch_all()
# prints: >>> SELECT id, name FROM "user"
# params: []
with sql_debugging():
result = db.select(User).filter(User.id == 1).fetch_one()
"""
from __future__ import annotations
import sys
import threading
from dataclasses import dataclass
from typing import IO, Any
__all__ = [
"set_sql_debug",
"sql_debugging",
"QueryStat",
"global_stats",
"clear_global_stats",
]
# ---------------------------------------------------------------------------
# SQL debug
# ---------------------------------------------------------------------------
_debug_lock = threading.Lock()
_debug_enabled: bool = False
[docs]
def set_sql_debug(debug: bool = True) -> None:
"""Enable or disable global SQL debug logging.
When enabled, every SQL statement executed by :class:`~nextorm.database.Database`
or :class:`~nextorm.async_database.AsyncDatabase` is printed to *stdout*
before execution. Use :class:`sql_debugging` as a context manager for
scoped temporary debugging.
Parameters
----------
debug:
``True`` to enable (default), ``False`` to disable.
"""
global _debug_enabled
with _debug_lock:
_debug_enabled = debug
[docs]
class sql_debugging:
"""Context manager that temporarily enables SQL debug output.
On exit the previous debug state is restored, regardless of exceptions::
with sql_debugging():
users = db.select(User).fetch_all()
# debug logging is off again here
"""
[docs]
def __init__(self) -> None:
self._previous: bool = False
def __enter__(self) -> sql_debugging:
global _debug_enabled
with _debug_lock:
self._previous = _debug_enabled
_debug_enabled = True
return self
def __exit__(self, *_: object) -> None:
global _debug_enabled
with _debug_lock:
_debug_enabled = self._previous
def _print_sql( # pyright: ignore[reportUnusedFunction]
sql: str,
params: list[Any],
*,
file: IO[str] | None = None,
) -> None:
"""Print *sql* and *params* to *file* (default: ``sys.stdout``) if debug is on."""
if not _debug_enabled:
return
out = file or sys.stdout
print(f">>> {sql}", file=out)
if params:
print(f" params: {params}", file=out)
# ---------------------------------------------------------------------------
# Query statistics
# ---------------------------------------------------------------------------
[docs]
@dataclass
class QueryStat:
"""Per-query-string execution statistics.
Attributes
----------
count:
Number of times the query was executed.
sum_time:
Total execution time in seconds.
min_time:
Minimum single execution time (``inf`` when *count* is 0).
max_time:
Maximum single execution time in seconds.
avg_time:
Average execution time in seconds (computed property).
"""
count: int = 0
sum_time: float = 0.0
min_time: float = float("inf")
max_time: float = 0.0
@property
def avg_time(self) -> float:
"""Average execution time in seconds."""
return self.sum_time / self.count if self.count else 0.0
def _record(self, elapsed: float) -> None:
"""Record one query execution taking *elapsed* seconds."""
self.count += 1
self.sum_time += elapsed
if elapsed < self.min_time:
self.min_time = elapsed
if elapsed > self.max_time:
self.max_time = elapsed
def _merge(self, other: QueryStat) -> None:
"""Merge all observations from *other* into this instance."""
if other.count == 0:
return
self.count += other.count
self.sum_time += other.sum_time
if other.min_time < self.min_time:
self.min_time = other.min_time
if other.max_time > self.max_time:
self.max_time = other.max_time
#: Module-level global query statistics.
#:
#: Populated by :meth:`~nextorm.database.Database.merge_local_stats`;
#: cleared by :func:`clear_global_stats`.
global_stats: dict[str, QueryStat] = {}
_global_stats_lock = threading.Lock()
[docs]
def clear_global_stats() -> None:
"""Clear the module-level :data:`global_stats` dictionary."""
with _global_stats_lock:
global_stats.clear()