Async Usage¶
NextORM provides a full async API through AsyncDatabase.
Every method that touches the database has an async counterpart — you never need to
mix sync and async styles within a single AsyncDatabase.
AsyncDatabase¶
AsyncDatabase is the async mirror of
Database. Create and use it entirely with await:
import asyncio
from nextorm import AsyncDatabase, Entity, PK, Req, db_session
class Post(Entity):
id: PK[int]
title: Req[str]
body: Req[str]
async def main() -> None:
db = AsyncDatabase(entities=[Post])
await db.bind("sqlite", ":memory:")
await db.generate_mapping(create_tables=True)
# INSERT via session (auto-save, same as sync)
async with db_session:
p = Post(title="Hello", body="World")
# ← INSERT fires automatically on __aexit__; p.id is now available
print(p.id)
# Or save explicitly outside a session:
p2 = Post(title="World", body="Hello")
await db.asave(p2)
print(p2.id) # available after asave()
# SELECT
posts = await db.aselect(Post).filter(Post.title == "Hello").fetch_all()
# DELETE
await db.adelete_instance(posts[0])
await db.close()
asyncio.run(main())
Context manager¶
AsyncDatabase also works as an async context manager
that binds and closes the connection automatically:
async with db:
results = await db.aselect(Post).fetch_all()
CRUD methods¶
Method |
Description |
|---|---|
|
INSERT (new entity) or UPDATE (dirty entity). |
|
Force INSERT even if the entity already has a PK. |
|
DELETE the row for the given entity instance. |
|
Flush + COMMIT. |
|
ROLLBACK + clear identity map. |
|
Write pending changes without committing. |
Async queries¶
aselect() returns an
AsyncQuerySet. Every terminal method is a coroutine:
results = await db.aselect(Post).filter(Post.title == "Hello").fetch_all()
first = await db.aselect(Post).filter(Post.id == 1).fetch_one()
n = await db.aselect(Post).count()
exists = await db.aselect(Post).filter(Post.id == 99).exists()
deleted = await db.aselect(Post).filter(Post.draft == True).delete()
updated = await db.aselect(Post).filter(Post.draft == True).update(draft=False)
Class-level shortcuts¶
Just like the sync API, you can skip the explicit db reference:
# Entity.aselect() — equivalent to db.aselect(Entity)
results = await Post.aselect().filter(Post.draft == False).fetch_all()
# Entity.aget() — returns None if no match; raises if multiple rows match
post = await Post.aget(title="Hello")
# Primary key subscript works for sync sessions; use aget for async:
post = await Post.aget(id=1)
Async sessions¶
db_session() works as an async context manager and decorator:
from nextorm import db_session
async with db_session:
Post(title="Draft", body="…") # auto-scheduled for INSERT
# ← aflush() + acommit() fired automatically on __aexit__
@db_session
async def create_post(title: str, body: str) -> None:
Post(title=title, body=body)
Mixing sync and async databases¶
A single db_session context can span both
Database and
AsyncDatabase instances simultaneously.
The __aexit__ path of db_session() detects
database type at runtime and dispatches accordingly:
sync_db = Database(entities=[User])
async_db = AsyncDatabase(entities=[Post])
sync_db.bind("sqlite", "users.db")
sync_db.generate_mapping()
await async_db.bind("sqlite", "posts.db")
await async_db.generate_mapping()
async with db_session:
User(name="alice") # tracked by sync_db
Post(title="Hello", body="…") # tracked by async_db
# ← flushes sync and async DBs, then commits both primary-first
Connection providers¶
AsyncDatabase supports all three providers:
await db.bind("sqlite", ":memory:")
# or a file:
await db.bind("sqlite", "/tmp/app.db")
await db.bind(
"postgres",
host="localhost",
port=5432,
dbname="myapp",
user="app",
password="secret",
)
await db.bind(
"mariadb",
host="localhost",
user="app",
password="secret",
db="myapp",
)
Async connection pool¶
Enable connection pooling by passing pool parameters to bind:
await db.bind(
"postgres",
host="localhost", dbname="myapp", user="app", password="s3cr3t",
pool_min=2, pool_max=10, pool_timeout=30.0,
)
See Connection Pooling for details.
Raw SQL¶
rows_affected = await db.execute("UPDATE post SET draft = FALSE WHERE published < ?", [cutoff])
rows = await db.select_raw("SELECT id, title FROM post WHERE draft = ?", [False])