Source code for nextorm.pool

"""Connection pool — thread-safe pool for sync database connections.

A :class:`ConnectionPool` pre-creates *min_size* connections at construction
time and allows up to *max_size* concurrent checkouts.  Connections are
returned to the pool after each use; if *max_size* is already checked out the
caller waits up to *timeout* seconds before a :exc:`PoolTimeoutError` is
raised.

Usage::

    from nextorm import Database

    db = Database(entities=[User])
    db.bind("sqlite", "app.db", pool_min=1, pool_max=5, pool_timeout=30.0)
    db.generate_mapping()

    # Both styles of usage transparently pull a connection from the pool:
    results = db.select(User).fetch_all()

Async counterpart: :class:`AsyncConnectionPool`.
"""

from __future__ import annotations

import queue
import threading
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from collections.abc import Callable

__all__ = ["ConnectionPool", "AsyncConnectionPool", "PoolTimeoutError"]


[docs] class PoolTimeoutError(Exception): """Raised when all pool connections are in use and the wait times out."""
[docs] class ConnectionPool: """Thread-safe synchronous connection pool. Parameters ---------- factory: Zero-argument callable that creates and returns a new connection. min_size: Number of connections to pre-create at construction time (0 is valid). max_size: Maximum total number of connections. Once reached, :meth:`acquire` blocks until a connection is released or *timeout* expires. timeout: Seconds to wait for a free connection before raising :exc:`PoolTimeoutError`. """
[docs] def __init__( self, factory: Callable[[], Any], *, min_size: int = 1, max_size: int = 5, timeout: float = 30.0, ) -> None: if min_size < 0: raise ValueError("min_size must be >= 0") if max_size < 1: raise ValueError("max_size must be >= 1") if min_size > max_size: raise ValueError("min_size must be <= max_size") self._factory = factory self._max_size = max_size self._timeout = timeout self._lock = threading.Lock() self._queue: queue.Queue[Any] = queue.Queue() self._size = 0 # Pre-create min_size connections eagerly for _ in range(min_size): conn = self._create() self._queue.put(conn)
def _create(self) -> Any: """Create a new connection and increment the total size counter.""" conn = self._factory() with self._lock: self._size += 1 return conn
[docs] def acquire(self) -> Any: """Check out a connection from the pool. Returns immediately when an idle connection is available. Creates a new connection when the pool is below *max_size*. Blocks up to *timeout* seconds when the pool is at *max_size*. Raises ------ PoolTimeoutError If no connection becomes available within the timeout. """ # Fast path: try to get an idle connection without blocking try: return self._queue.get_nowait() except queue.Empty: pass # Slow path: try to create a new connection if below max_size with self._lock: if self._size < self._max_size: conn = self._factory() self._size += 1 return conn # Pool exhausted — wait for a release try: return self._queue.get(timeout=self._timeout) except queue.Empty: raise PoolTimeoutError( f"Connection pool exhausted: all {self._max_size} connections are in use " f"and no connection was released within {self._timeout}s." ) from None
[docs] def release(self, conn: Any) -> None: """Return *conn* to the pool for reuse.""" self._queue.put(conn)
[docs] def close_all(self) -> None: """Close all idle connections in the pool. Connections that are currently checked out are not affected; they will be closed when :meth:`release` is called and the pool detects that it is shutting down (currently the pool does not track checked-out connections, so callers must ensure all connections are released before calling this method if a clean shutdown is required). """ while True: try: conn = self._queue.get_nowait() conn.close() with self._lock: self._size -= 1 except queue.Empty: break
@property def pool_size(self) -> int: """Total number of connections created (idle + checked out).""" return self._size @property def idle_count(self) -> int: """Number of connections currently sitting idle in the pool.""" return self._queue.qsize()
[docs] class AsyncConnectionPool: """Async connection pool (uses :mod:`asyncio` synchronisation primitives). Parameters ---------- factory: Async callable that creates and returns a new async connection. min_size: Number of connections to pre-create during :meth:`start`. max_size: Maximum total number of connections. timeout: Seconds to wait for a free connection before raising :exc:`PoolTimeoutError`. """
[docs] def __init__( self, factory: Callable[[], Any], *, min_size: int = 1, max_size: int = 5, timeout: float = 30.0, ) -> None: if min_size < 0: raise ValueError("min_size must be >= 0") if max_size < 1: raise ValueError("max_size must be >= 1") if min_size > max_size: raise ValueError("min_size must be <= max_size") self._factory = factory self._min_size = min_size self._max_size = max_size self._timeout = timeout self._idle: list[Any] = [] self._size = 0 self._lock: Any = None # created lazily on first use (asyncio.Lock) self._semaphore: Any = None # created lazily (asyncio.Semaphore)
def _ensure_primitives(self) -> None: """Create asyncio synchronisation objects on first call (inside event loop).""" import asyncio # noqa: PLC0415 if self._lock is None: self._lock = asyncio.Lock() if self._semaphore is None: self._semaphore = asyncio.Semaphore(self._max_size)
[docs] async def start(self) -> None: """Pre-create *min_size* connections. Must be called inside an event loop.""" self._ensure_primitives() for _ in range(self._min_size): conn = await self._factory() self._idle.append(conn) self._size += 1
[docs] async def acquire(self) -> Any: """Check out an async connection from the pool. Raises ------ PoolTimeoutError If no connection becomes available within the timeout. """ import asyncio # noqa: PLC0415 self._ensure_primitives() try: await asyncio.wait_for(self._semaphore.acquire(), timeout=self._timeout) except TimeoutError: raise PoolTimeoutError( f"Async connection pool exhausted: all {self._max_size} connections are in use " f"and none was released within {self._timeout}s." ) from None async with self._lock: if self._idle: return self._idle.pop() conn = await self._factory() self._size += 1 return conn
[docs] async def release(self, conn: Any) -> None: """Return *conn* to the pool for reuse.""" self._ensure_primitives() async with self._lock: self._idle.append(conn) self._semaphore.release()
[docs] async def close_all(self) -> None: """Close all idle connections.""" self._ensure_primitives() async with self._lock: for conn in self._idle: await conn.close() self._size -= 1 self._idle.clear()
@property def pool_size(self) -> int: """Total number of connections created.""" return self._size @property def idle_count(self) -> int: """Number of idle connections.""" return len(self._idle)