Postgres¶
Haiway ships with a context-aware Postgres integration that wraps asyncpg, exposes typed row
helpers, and coordinates schema migrations through the state system. The feature keeps the
framework's functional style while handling connection pooling and transactions for you.
Overview¶
- Context Managed: Acquire connections through Haiway scopes to ensure cleanup
- Typed Accessors:
PostgresRow(an immutable mapping) exposes helpers for UUIDs, datetimes, and primitive types - Protocol Driven: Backends plug in via protocols, enabling custom clients in tests
- Migrations Included: Built-in runner discovers and executes ordered migration modules
- Configuration Storage: Optional
ConfigurationRepositorybacked by versioned Postgres rows - Immutable State: Connections are exposed as
State; rows are immutableMappingwrappers with strict typing helpers
Quick Start¶
Install the Postgres extra to pull in asyncpg:
Use the provided PostgresConnectionPool as a disposable resource:
from haiway import ctx
from haiway.postgres import Postgres, PostgresConnectionPool, PostgresRow
async with ctx.scope(
"postgres",
disposables=(PostgresConnectionPool(),),
):
await Postgres.execute(
"INSERT INTO users(email) VALUES($1)",
email,
)
row: PostgresRow | None = await Postgres.fetch_one(
"SELECT email FROM users WHERE id = $1",
user_id,
)
return None if row is None else row.get_str("email")
Configuration¶
Connection parameters are sourced from environment variables at import time. All values have sane defaults so the driver works out of the box. Because these values are read when the module is imported, changing the environment later does not affect already-imported defaults:
| Variable | Default | Description |
|---|---|---|
POSTGRES_HOST |
localhost |
Server hostname |
POSTGRES_PORT |
5432 |
Server port |
POSTGRES_DATABASE |
postgres |
Database name |
POSTGRES_USER |
postgres |
Authentication user |
POSTGRES_PASSWORD |
postgres |
Authentication password |
POSTGRES_SSLMODE |
prefer |
Value forwarded to the pool ssl arg |
POSTGRES_CONNECTIONS |
1 |
Maximum number of open connections |
Provide custom environment variables or pass explicit keyword arguments to PostgresConnectionPool
when instantiating it to tweak connection parameters.
To bootstrap from an existing connection string (DSN, or Data Source Name), use the of
constructor. The DSN can be provided positionally or as the dsn keyword:
pool = PostgresConnectionPool.of(
dsn="postgresql://analytics@db.internal:5432/events?sslmode=require&connections=6",
)
The helper parses the DSN, applies sane defaults for missing components, and respects query-string
overrides such as sslmode, ssl, connections, connection_limit, maxsize, or max_size.
Working with Connections¶
Postgres is a State that exposes functional helpers: fetch, fetch_one, and execute. When
called outside an existing connection scope the helpers acquire and release a connection
automatically. Inside a scope that already provides a PostgresConnection, the helpers reuse the
instance and avoid nested acquisitions. Explicit recursive calls to Postgres.acquire_connection()
from inside an existing connection scope raise RuntimeError.
To run multiple statements on a single connection, acquire it explicitly:
async with ctx.scope("postgres", disposables=(PostgresConnectionPool(),)):
async with ctx.disposables(Postgres.acquire_connection()):
await Postgres.execute("SET search_path TO app")
rows = await Postgres.fetch("SELECT * FROM users")
Typed Rows¶
Every result row is wrapped in PostgresRow, an immutable mapping that validates column access. Use
the helper methods to retrieve typed values:
row: PostgresRow | None = await Postgres.fetch_one("SELECT id, joined_at FROM users WHERE email = $1", email)
if row is not None:
user_id = row.get_int("id")
joined = row.get_datetime("joined_at")
The helpers raise TypeError when the underlying value does not match the expected type, keeping
type assumptions honest at runtime.
Transactions¶
PostgresConnection.transaction() returns an async context manager handling transaction
automatically:
from haiway.postgres import PostgresConnection
async with ctx.scope("postgres", disposables=(PostgresConnectionPool(),)):
async with ctx.disposables(Postgres.acquire_connection()):
async with PostgresConnection.transaction():
await PostgresConnection.execute("DELETE FROM jobs WHERE finished")
await PostgresConnection.execute("INSERT INTO audit(action) VALUES('cleanup')")
Any exception raised inside the block rolls back the transaction; successful execution commits the changes.
Migrations¶
The optional, lightweight migration runner executes callables conforming to PostgresMigrating. You
can pass either a sequence of migrations or a dotted module path where submodules named
migration_<number> expose a migration coroutine. Module names must use a continuous sequence
starting at migration_0; gaps or duplicate numbers raise ValueError.
async with ctx.scope("migrations", disposables=(PostgresConnectionPool(),)):
await Postgres.execute_migrations("my_app.db.migrations")
The runner ensures a migrations table exists, reads the current version, and applies any pending
entries in numeric order. Each migration executes inside its own transaction and appends an entry to
the table once complete.
Example package layout:
Each module should export an async migration(connection: PostgresConnection) -> None callable.
Configuration Repository¶
PostgresConfigurationRepository() adapts Haiway's generic ConfigurationRepository to a
Postgres-backed store. It persists immutable configuration snapshots in a configurations table and
uses in-memory caching for listing and loading operations.
from haiway import ConfigurationRepository, ctx
from haiway.postgres import PostgresConfigurationRepository, PostgresConnectionPool
async with ctx.scope(
"config",
PostgresConfigurationRepository(),
disposables=(PostgresConnectionPool(),),
):
available = await ConfigurationRepository.configurations()
The repository expects this schema to exist before use:
CREATE TABLE configurations (
identifier TEXT NOT NULL,
name TEXT NOT NULL,
content JSONB NOT NULL,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (identifier, created)
);
CREATE INDEX IF NOT EXISTS configurations_idx
ON configurations (identifier, created DESC);
Behavior summary:
listing(...)returns the newest distinct identifiers, optionally filtered by configuration typeloading(...)fetches the newest row for an identifier and reconstructs it withfrom_json(...)defining(...)inserts a new snapshot row instead of updating in placeremoving(...)deletes all rows for the identifier- successful writes clear the in-memory listing/loading caches
- cache behavior is configurable through
cache_limitandcache_expiration
Error Handling¶
Unexpected execution failures raise PostgresException:
from haiway.postgres import PostgresException
try:
await Postgres.fetch("SELECT * FROM missing_table")
except PostgresException as exc:
ctx.log_error("Postgres query failed", exception=exc)
Testing¶
Swap the default connection acquisition with a stub that records executed statements or returns
prepared data. Implement the connection protocols from haiway.postgres.types to adapt in-memory
fixtures without touching a real database.
from haiway import ctx
from haiway.postgres import Postgres, PostgresConnection
class _NoopTransaction:
async def __aenter__(self) -> None:
return None
async def __aexit__(self, *_) -> None:
return None
class FakeConnectionContext:
def __init__(self):
self.statements = []
async def __aenter__(self) -> PostgresConnection:
async def execute(statement: str, /, *args):
self.statements.append((statement, args))
return tuple()
return PostgresConnection(
statement_executing=execute,
transaction_preparing=lambda: _NoopTransaction(),
)
async def __aexit__(self, *_) -> None:
return None
class FakePostgres:
def __init__(self, context: FakeConnectionContext):
self._context = context
async def __aenter__(self) -> Postgres:
return Postgres(connection_acquiring=lambda: self._context)
async def __aexit__(self, *_) -> None:
return None
async def test_insert():
connection_context = FakeConnectionContext()
async with ctx.scope("test", disposables=(FakePostgres(connection_context),)):
await Postgres.execute("INSERT INTO audit(action) VALUES($1)", "created")
assert connection_context.statements == [
("INSERT INTO audit(action) VALUES($1)", ("created",)),
]
Best Practices¶
- Use
ctx.scope(...)orctx.disposables(...)so pools and acquired connections are cleaned up. - Acquire a connection explicitly when several statements must share one transaction or session.
- Prefer
PostgresRowaccessors over direct subscripting when the column type matters. - Keep migration modules numbered continuously from
migration_0. - Catch
PostgresExceptionat the application boundary and translate it into domain-specific errors.