Source code for nextorm.collection

"""RelatedCollection — lazy collection returned by ``Set[T]`` relation access.

Usage::

    post = db.select(Post).fetch_one()
    post.comments  # → RelatedCollection[Comment]
    post.comments.count()  # → int (SELECT COUNT(*))
    for c in post.comments:
        print(c.text)  # lazy-loads all comments on first iteration

    post.comments.add(Comment(text="hello"))
    post.comments.remove(comment)
    post.comments.clear()
"""

from __future__ import annotations

from collections.abc import Iterator  # noqa: TC003
from typing import TYPE_CHECKING, Any, TypeVar, cast

if TYPE_CHECKING:
    from nextorm.database import Database
    from nextorm.entity import Entity, RelationInfo
    from nextorm.query import QuerySet

__all__ = ["RelatedCollection"]

T = TypeVar("T", bound="Entity")


[docs] class RelatedCollection[T: "Entity"]: """A lazy, database-backed collection representing one side of a relation. Instances are created by :class:`~nextorm.entity.SetDescriptor` on first attribute access. They hold a weak logical reference to the owner entity so that individual collections can be GC'd when not needed. The collection is *not* thread-safe. Use separate collection objects in separate threads. Parameters ---------- owner: The entity instance that owns this relation attribute. ri: The :class:`~nextorm.entity.RelationInfo` for this relation. db: The :class:`~nextorm.database.Database` to query. ``None`` when the owner entity was not loaded from a database (e.g. a freshly created but unsaved entity). """
[docs] def __init__( self, owner: Entity, ri: RelationInfo, db: Database | None, ) -> None: self._owner = owner self._ri = ri self._db = db self._cache: list[T] | None = None
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _require_db(self) -> Database: if self._db is None: raise RuntimeError( f"Cannot access related collection '{self._ri.name}': " "the owner entity has no database context. " "Load it via db.select(...).fetch_all() first." ) return self._db def _owner_pk(self) -> Any: from nextorm.database import _get_pk_val # noqa: PLC0415 pk = _get_pk_val(self._owner) assert pk is not None, "Owner entity has no primary key" return pk def _resolve_target(self) -> type[Entity]: """Return the concrete target entity class.""" from nextorm.entity import _resolve_entity_target # noqa: PLC0415 target = self._ri.spec.target resolved = _resolve_entity_target(target) if resolved is None: tname = target if isinstance(target, str) else repr(target) raise RuntimeError( f"Cannot resolve forward-reference target '{tname}'. " "Ensure the entity is defined and registered." ) return cast("type[Entity]", resolved) def _is_m2m(self) -> bool: """Return True when the relation is many-to-many (join table exists).""" from nextorm.entity import _matches_entity # noqa: PLC0415 from nextorm.fields import RelationKind # noqa: PLC0415 target_cls = self._resolve_target() owner_cls = type(self._owner) return any( r.spec.kind == RelationKind.SET and _matches_entity(r.spec.target, owner_cls) for r in target_cls._relations_.values() ) def _join_table_name(self) -> str: target_cls = self._resolve_target() owner_table = type(self._owner)._table_name_ target_table = target_cls._table_name_ return "_".join(sorted([owner_table, target_table])) def _build_queryset(self) -> QuerySet[T]: """Return a :class:`~nextorm.query.QuerySet` scoped to this collection.""" db = self._require_db() target_cls = self._resolve_target() qs: QuerySet[T] = db.select(target_cls) # type: ignore[arg-type] if self._is_m2m(): # M2M — join through the join table join_table = self._join_table_name() owner_table = type(self._owner)._table_name_ owner_pk = self._owner_pk() target_table = target_cls._table_name_ from nextorm.sql.nodes import BinOp, ColumnRef, Param # noqa: PLC0415 # SELECT target.* FROM target JOIN <join> ON target.id = <join>.target_id # WHERE <join>.owner_id = <pk> join_cond = BinOp( ColumnRef("id", target_table), "=", ColumnRef(f"{target_table}_id", join_table), ) owner_cond = BinOp( ColumnRef(f"{owner_table}_id", join_table), "=", Param(value=owner_pk), ) return qs.join(join_table, join_cond).filter(owner_cond) else: # O2M — filter by FK column on target side owner_cls = type(self._owner) owner_table = owner_cls._table_name_ owner_pk = self._owner_pk() # Find the Single back-ref on target pointing at us from nextorm.entity import _matches_entity # noqa: PLC0415 from nextorm.fields import RelationKind # noqa: PLC0415 back_ref = next( ( r for r in target_cls._relations_.values() if r.spec.kind == RelationKind.SINGLE and _matches_entity(r.spec.target, owner_cls) ), None, ) if back_ref is None: raise RuntimeError( f"Cannot find Single back-reference on " f"{target_cls.__name__} pointing at {owner_cls.__name__}. " "Ensure a Single[...] attribute is declared on the target." ) fk_col = f"{back_ref.name}_id" from nextorm.sql.nodes import BinOp, ColumnRef, Param # noqa: PLC0415 return qs.filter(BinOp(ColumnRef(fk_col), "=", Param(value=owner_pk))) def _ensure_loaded(self) -> list[T]: """Load all items into the cache and return it.""" if self._cache is None: self._cache = self._build_queryset().fetch_all() return self._cache def _invalidate(self) -> None: """Invalidate the in-memory cache so the next access re-queries.""" self._cache = None # ------------------------------------------------------------------ # Sequence-like interface # ------------------------------------------------------------------ def __iter__(self) -> Iterator[T]: """Iterate over all related objects, loading them from the database if necessary.""" yield from self._ensure_loaded() def __len__(self) -> int: """Return the number of related objects via ``SELECT COUNT(*)``.""" return self.count() def __contains__(self, item: object) -> bool: target_cls = self._resolve_target() if not isinstance(item, target_cls): return False from nextorm.database import _get_pk_val # noqa: PLC0415 pk = _get_pk_val(item) if pk is None: return False if self._is_m2m(): join_table = self._join_table_name() owner_table = type(self._owner)._table_name_ target_table = target_cls._table_name_ owner_pk = self._owner_pk() db = self._require_db() from nextorm.sql.nodes import BinOp, ColumnRef, Param, Select, Star # noqa: PLC0415 stmt = Select( columns=(Star(),), from_table=join_table, where=BinOp( BinOp( ColumnRef(f"{owner_table}_id"), "=", Param(value=owner_pk), ), "AND", BinOp( ColumnRef(f"{target_table}_id"), "=", Param(value=pk), ), ), limit=1, ) assert db._builder is not None sql, params = db._builder.render(stmt) return bool(db._execute(sql, params)) else: # Check whether the FK column on target points to our owner back_ref_name = self._ri.spec.reverse if back_ref_name is None: owner_cls = type(self._owner) from nextorm.entity import _matches_entity as _me # noqa: PLC0415 from nextorm.fields import RelationKind # noqa: PLC0415 back_ref = next( ( r for r in target_cls._relations_.values() if r.spec.kind == RelationKind.SINGLE and _me(r.spec.target, owner_cls) ), None, ) if back_ref is None: return False back_ref_name = back_ref.name br_id = f"_{back_ref_name}_id" br_obj = f"_{back_ref_name}_obj" stored = item.__dict__.get(br_id) or item.__dict__.get(br_obj) if stored is None: return False if isinstance(stored, int): return bool(stored == self._owner_pk()) from nextorm.database import _get_pk_val as _gpv # noqa: PLC0415 from nextorm.entity import Entity as _Entity # noqa: PLC0415 stored_pk = _gpv(stored) if isinstance(stored, _Entity) else None return bool(stored_pk == self._owner_pk()) # ------------------------------------------------------------------ # Query methods # ------------------------------------------------------------------
[docs] def count(self) -> int: """Return the number of items without loading them.""" return self._build_queryset().count()
[docs] def is_empty(self) -> bool: """Return ``True`` if the collection has no items.""" return not self._build_queryset().exists()
[docs] def copy(self) -> set[T]: """Return a plain Python ``set`` of all loaded items.""" return set(self._ensure_loaded())
[docs] def load(self) -> list[T]: """Eagerly load all items and return them as a list.""" return self._ensure_loaded()
[docs] def select(self) -> QuerySet[T]: """Return a :class:`~nextorm.query.QuerySet` scoped to this collection.""" return self._build_queryset()
[docs] def filter(self, *conditions: Any) -> QuerySet[T]: """Return a filtered :class:`~nextorm.query.QuerySet` for this collection.""" return self._build_queryset().filter(*conditions)
[docs] def order_by(self, *items: Any) -> QuerySet[T]: """Return an ordered :class:`~nextorm.query.QuerySet` for this collection.""" return self._build_queryset().order_by(*items)
[docs] def page(self, pagenum: int, pagesize: int = 10) -> QuerySet[T]: """Return a page of this collection's items (1-based page numbers).""" return self._build_queryset().page(pagenum, pagesize)
[docs] def random(self, n: int) -> QuerySet[T]: """Return *n* randomly selected items from this collection.""" return self._build_queryset().random(n)
# ------------------------------------------------------------------ # Mutation methods # ------------------------------------------------------------------
[docs] def add(self, *items: T) -> None: """Add one or more items to this collection. For M2M: inserts rows into the join table. For O2M: updates the FK column on each item. """ db = self._require_db() owner_pk = self._owner_pk() owner_cls = type(self._owner) if self._is_m2m(): join_table = self._join_table_name() owner_table = owner_cls._table_name_ target_cls = self._resolve_target() target_table = target_cls._table_name_ from nextorm.database import _get_pk_val as _gpv # noqa: PLC0415 from nextorm.sql.nodes import Insert, Param # noqa: PLC0415 owner_col = f"{owner_table}_id" target_col = f"{target_table}_id" assert db._builder is not None for item in items: item_pk = _gpv(item) stmt = Insert( table=join_table, columns=(owner_col, target_col), values=(Param(value=owner_pk), Param(value=item_pk)), ) sql, params = db._builder.render(stmt) db._execute_dml(sql, params) else: target_cls = self._resolve_target() from nextorm.entity import _matches_entity as _me # noqa: PLC0415 from nextorm.fields import RelationKind # noqa: PLC0415 back_ref = next( ( r for r in target_cls._relations_.values() if r.spec.kind == RelationKind.SINGLE and _me(r.spec.target, owner_cls) ), None, ) if back_ref is None: raise RuntimeError( f"Cannot add to {self._ri.name}: no Single back-reference found." ) for item in items: setattr(item, back_ref.name, self._owner) db.save(item) self._invalidate()
[docs] def remove(self, *items: T) -> None: """Remove one or more items from this collection. For M2M: deletes rows from the join table. For O2M: sets the FK column to NULL (requires nullable FK). """ db = self._require_db() owner_pk = self._owner_pk() owner_cls = type(self._owner) if self._is_m2m(): join_table = self._join_table_name() owner_table = owner_cls._table_name_ target_cls = self._resolve_target() target_table = target_cls._table_name_ from nextorm.database import _get_pk_val as _gpv # noqa: PLC0415 from nextorm.sql.nodes import BinOp, ColumnRef, Delete, Param # noqa: PLC0415 owner_col = f"{owner_table}_id" target_col = f"{target_table}_id" assert db._builder is not None for item in items: item_pk = _gpv(item) stmt = Delete( table=join_table, where=BinOp( BinOp(ColumnRef(owner_col), "=", Param(value=owner_pk)), "AND", BinOp(ColumnRef(target_col), "=", Param(value=item_pk)), ), ) sql, params = db._builder.render(stmt) db._execute_dml(sql, params) else: target_cls = self._resolve_target() from nextorm.entity import _matches_entity as _me # noqa: PLC0415 from nextorm.fields import RelationKind # noqa: PLC0415 back_ref = next( ( r for r in target_cls._relations_.values() if r.spec.kind == RelationKind.SINGLE and r.spec.nullable and _me(r.spec.target, owner_cls) ), None, ) if back_ref is None: raise RuntimeError( f"Cannot remove from {self._ri.name}: no nullable Single " "back-reference found. Declare Single[...| None] on the target." ) for item in items: setattr(item, back_ref.name, None) db.save(item) self._invalidate()
[docs] def clear(self) -> None: """Remove all items from this collection.""" db = self._require_db() owner_pk = self._owner_pk() owner_cls = type(self._owner) if self._is_m2m(): join_table = self._join_table_name() owner_table = owner_cls._table_name_ from nextorm.sql.nodes import BinOp, ColumnRef, Delete, Param # noqa: PLC0415 owner_col = f"{owner_table}_id" assert db._builder is not None stmt = Delete( table=join_table, where=BinOp(ColumnRef(owner_col), "=", Param(value=owner_pk)), ) sql, params = db._builder.render(stmt) db._execute_dml(sql, params) else: target_cls = self._resolve_target() from nextorm.entity import _matches_entity as _me # noqa: PLC0415 from nextorm.fields import RelationKind # noqa: PLC0415 back_ref = next( ( r for r in target_cls._relations_.values() if r.spec.kind == RelationKind.SINGLE and r.spec.nullable and _me(r.spec.target, owner_cls) ), None, ) if back_ref is None: raise RuntimeError( f"Cannot clear {self._ri.name}: no nullable Single " "back-reference found. Declare Single[...| None] on the target." ) fk_col = f"{back_ref.name}_id" from nextorm.sql.nodes import BinOp, ColumnRef, Literal, Param, Update # noqa: PLC0415 assert db._builder is not None stmt_u = Update( table=target_cls._table_name_, assignments=((fk_col, Literal(None)),), where=BinOp(ColumnRef(fk_col), "=", Param(value=owner_pk)), ) sql, params = db._builder.render(stmt_u) db._execute_dml(sql, params) self._invalidate()
[docs] def create(self, **kwargs: Any) -> T: """Create a related entity and immediately link it to this collection. The entity is saved to the database first (to obtain a primary key), then linked via :meth:`add`. Returns the newly created entity. """ db = self._require_db() target_cls = self._resolve_target() item: T = cast("T", target_cls(**kwargs)) # Save the item first so it gets a primary key (required for M2M join inserts # and for O2M where add() will UPDATE the FK column). db.save(item) self.add(item) return item
# ------------------------------------------------------------------ # Repr # ------------------------------------------------------------------ def __repr__(self) -> str: from nextorm.entity import _target_name # noqa: PLC0415 target = self._ri.spec.target target_label = _target_name(target) or repr(target) owner_repr = repr(self._owner) return f"RelatedCollection[{target_label}]({owner_repr}.{self._ri.name})"