Postgres¶
Haiway ships with a context-aware Postgres integration that wraps asyncpg
, exposes typed row
helpers, and coordinates schema migrations through the state system. The feature keeps the
framework's functional style while handling connection pooling and transactions for you.
Overview¶
- Context Managed: Acquire connections through Haiway scopes to ensure cleanup
- Typed Accessors:
PostgresRow
exposes helpers for UUIDs, datetimes, and primitive types - Protocol Driven: Backends plug in via protocols, enabling custom clients in tests
- Migrations Included: Built-in runner discovers and executes ordered migration modules
- Immutable State: Connections and rows are immutable
State
objects with strict typing
Quick Start¶
Install the Postgres extra to pull in asyncpg
:
Use the provided PostgresConnectionPool
as a disposable resource:
from haiway import ctx
from haiway.postgres import Postgres, PostgresConnectionPool, PostgresRow
async with ctx.scope(
"postgres",
disposables=(PostgresConnectionPool(),),
):
await Postgres.execute(
"INSERT INTO users(email) VALUES($1)",
email,
)
row: PostgresRow | None = await Postgres.fetch_one(
"SELECT email FROM users WHERE id = $1",
user_id,
)
return None if row is None else row.get_str("email")
Configuration¶
Connection parameters are sourced from environment variables at import time. All values have sane defaults so the driver works out of the box:
Variable | Default | Description |
---|---|---|
POSTGRES_HOST |
localhost |
Server hostname |
POSTGRES_PORT |
5432 |
Server port |
POSTGRES_DATABASE |
postgres |
Database name |
POSTGRES_USER |
postgres |
Authentication user |
POSTGRES_PASSWORD |
postgres |
Authentication password |
POSTGRES_SSLMODE |
prefer |
Value forwarded to the pool ssl arg |
POSTGRES_CONNECTIONS |
1 |
Maximum number of open connections |
Provide custom environment variables or pass explicit keyword arguments to PostgresConnectionPool
when instantiating it to tweak connection parameters.
Working with Connections¶
Postgres
is a State
that exposes functional helpers: fetch
, fetch_one
, and execute
. When
called outside an existing connection scope the helpers acquire and release a connection
automatically. Inside a scope that already provides a PostgresConnection
, the helpers reuse the
instance and avoid nested acquisitions.
To run multiple statements on a single connection, acquire it explicitly:
async with ctx.scope("postgres", disposables=(PostgresConnectionPool(),)):
async with ctx.disposables(Postgres.acquire_connection()):
await Postgres.execute("SET search_path TO app")
rows = await Postgres.fetch("SELECT * FROM users")
Typed Rows¶
Every result row is wrapped in PostgresRow
, an immutable mapping that validates column access. Use
the helper methods to retrieve typed values:
row: PostgresRow | None = await Postgres.fetch_one("SELECT id, joined_at FROM users WHERE email = $1", email)
if row is not None:
user_id = row.get_int("id")
joined = row.get_datetime("joined_at")
The helpers raise TypeError
when the underlying value does not match the expected type, keeping
type assumptions honest at runtime.
Transactions¶
PostgresConnection.transaction()
returns an async context manager handling transaction
automatically:
async with ctx.scope("postgres", disposables=(PostgresConnectionPool(),)):
# make sure to acquire the connection for transaction and use PostgresConnection
async with ctx.disposables(Postgres.acquire_connection()):
async with PostgresConnection.transaction():
await PostgresConnection.execute("DELETE FROM jobs WHERE finished")
await PostgresConnection.execute("INSERT INTO audit(action) VALUES('cleanup')")
Any exception raised inside the block rolls back the transaction; successful execution commits the changes.
Migrations¶
The optional, lightweight migration runner executes callables conforming to PostgresMigrating
. You
can pass either a sequence of migrations or a dotted module path where submodules named
migration_<number>
expose a migration
coroutine.
async with ctx.scope("migrations", disposables=(PostgresConnectionPool(),)):
await Postgres.execute_migrations("my_app.db.migrations")
The runner ensures a migrations
table exists, reads the current version, and applies any pending
entries in numeric order. Each migration executes inside its own transaction and appends an entry to
the table once complete.
Error Handling¶
Unexpected execution failures raise PostgresException
:
from haiway.postgres import PostgresException
try:
await Postgres.fetch("SELECT * FROM missing_table")
except PostgresException as exc:
ctx.log_error("Postgres query failed", exception=exc)
Testing¶
Swap the default connection acquisition with a stub that records executed statements or returns
prepared data. Implement the connection protocols from haiway.postgres.types
to adapt in-memory
fixtures without touching a real database.
from haiway import ctx
from haiway.postgres.state import Postgres, PostgresConnection
class _NoopTransaction:
async def __aenter__(self) -> None:
return None
async def __aexit__(self, *_) -> None:
return None
class FakeConnectionContext:
def __init__(self):
self.statements = []
async def __aenter__(self) -> PostgresConnection:
async def execute(statement: str, /, *args):
self.statements.append((statement, args))
return tuple()
return PostgresConnection(
statement_executing=execute,
transaction_preparing=lambda: _NoopTransaction(),
)
async def __aexit__(self, *_) -> None:
return None
class FakePostgres:
def __init__(self, context: FakeConnectionContext):
self._context = context
async def __aenter__(self) -> Postgres:
return Postgres(connection_acquiring=lambda: self._context)
async def __aexit__(self, *_) -> None:
return None
async def test_insert():
connection_context = FakeConnectionContext()
async with ctx.scope("test", disposables=(FakePostgres(connection_context),)):
await Postgres.execute("INSERT INTO audit(action) VALUES($1)", "created")
assert connection_context.statements == [
("INSERT INTO audit(action) VALUES($1)", ("created",)),
]
Best Practices¶
Use scopes: Always run database operations within a context that manages disposables Group
statements: Acquire a connection explicitly when running related queries Validate data: Prefer
PostgresRow
accessors over direct subscripting Log migrations: Watch the logger output during
migration execution for progress tracking Surface errors: Catch PostgresException
to translate
into application-level failures