Postgres integrations¶
Draive ships with Postgres-backed implementations for the most common persistence interfaces so you
can plug relational storage into your workflows without writing adapters. All helpers live in
draive.postgres and reuse the shared haiway.postgres.Postgres connection states.
Bootstrapping the Postgres context¶
Before using any adapter ensure a connection pool is available inside your context scope. The
helpers lean on PostgresConnectionPool and the Postgres facade exported from draive.postgres.
from draive import ctx
from draive.postgres import (
Postgres,
PostgresConnectionPool,
PostgresConfigurationRepository,
PostgresModelMemory,
PostgresTemplatesRepository,
)
async with ctx.scope(
"postgres-demo",
PostgresConfigurationRepository(), # use use postgres configurations
PostgresTemplatesRepository(), # use postgres templates
disposables=(
PostgresConnectionPool.of(dsn="postgresql://draive:secret@localhost:5432/draive"),
),
):
session_memory = PostgresModelMemory("demo-session")
Each adapter relies on the same connection scope, so you can freely mix them within a single context.
When working with pgvector-backed components, add the Python package pgvector to your environment
(pip install pgvector) and ensure every connection registers the codec. haiway ≥ 0.37.1 exposes an
initialize callback on PostgresConnectionPool.of(...) so you can point to the schema where the
extension is installed:
from asyncpg.connection import Connection
from pgvector.asyncpg import register_vector
async def initialize_pgvector(connection: Connection) -> None:
await register_vector(connection, schema="public")
PostgresConnectionPool.of(
dsn="postgresql://draive:secret@localhost:5432/draive",
initialize=initialize_pgvector,
)
Reuse the same initializer across scopes to keep the codec registration consistent.
ConfigurationRepository implementation¶
PostgresConfigurationRepository persists configuration snapshots inside a configurations table
and keeps a bounded LRU cache to avoid repeated fetches. The table must expose the schema used in
the implementation:
CREATE TABLE configurations (
identifier TEXT NOT NULL,
content JSONB NOT NULL,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (identifier, created)
);
Key capabilities:
configurations()returns every known identifier using cached results (limit 1, default 10 minute TTL).load(config, identifier)fetches the newest JSON document per identifier and parses it into a requested configuration type.load_raw(identifier)fetches raw Mapping for given identifier.define(config)upserts a new configuration snapshot and clears both caches, guaranteeing fresh reads on the next call.remove(identifier)deletes all historical snapshots for the identifier and purges caches.
Tune memory pressure through cache_limit and cache_expiration arguments when instantiating the
repository.
TemplatesRepository implementation¶
PostgresTemplatesRepository mirrors the behaviour of the file-backed templates repository while
storing revisions inside a dedicated templates table:
See the Templates guide for authoring patterns and runtime resolution examples.
CREATE TABLE templates (
identifier TEXT NOT NULL,
description TEXT DEFAULT NULL,
content TEXT NOT NULL,
variables JSONB NOT NULL DEFAULT '{}'::jsonb,
meta JSONB NOT NULL DEFAULT '{}'::jsonb,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (identifier, created)
);
Capabilities:
templates()returns cachedTemplateDeclarationobjects reflecting the newest revision per identifier.resolve(template)andresolve_str(template)reuse a cached loader keyed by identifier to pull the latest template body before rendering arguments.define(template, content)persists a new revision, invalidates caches, and ensures subsequent reads see the updated payload.
Use this adapter whenever your multimodal templates live alongside other structured content in Postgres and you want on-demand caching with revision history.
ModelMemory implementation¶
PostgresModelMemory enables durable conversational memory by persisting variables and context
elements in three tables sharing the same identifier:
CREATE TABLE memories (
identifier TEXT NOT NULL,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (identifier)
);
CREATE TABLE memories_variables (
identifier TEXT NOT NULL REFERENCES memories (identifier) ON DELETE CASCADE,
variables JSONB NOT NULL DEFAULT '{}'::jsonb,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE memories_elements (
identifier TEXT NOT NULL REFERENCES memories (identifier) ON DELETE CASCADE,
content JSONB NOT NULL,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
Capabilities:
recall(limit=...)fetches the latest variables and replayable context elements (inputs/outputs) respecting the optionalrecall_limitsupplied to the factory.remember(*items, variables=...)persists new context elements and optionally a fresh variable snapshot in a single transaction.maintenance(variables=...)ensures the basememoriesrow exists and can seed default variables without appending messages.
Use the memory helper when you need stateful chat sessions, per-user progressive profiling, or
auditable interaction logs. Set recall_limit to bound the amount of context loaded back into
generation pipelines.
VectorIndex implementation (pgvector)¶
The PostgresVectorIndex helper persists dense embeddings in Postgres using the
pgvector extension. Each indexed DataModel maps to its own
table, derived by converting the model class name to snake case (for example, Chunk → chunk).
Enable pgvector and create tables¶
Install the extension once per database and create a table for every data model you plan to index.
The implementation expects three columns and manages timestamps automatically. For a Chunk model
the migration could look like this (adjust the vector dimension to match your embedding provider):
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE chunk (
id UUID NOT NULL DEFAULT gen_random_uuid(),
embedding VECTOR(1536) NOT NULL,
payload JSONB NOT NULL,
meta JSONB NOT NULL,
created TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Optional ANN index (requires pgvector >= 0.4.0)
CREATE INDEX IF NOT EXISTS chunk_embedding_idx
ON chunk
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
The helper stores the serialized DataModel instance inside payload, so the JSON schema mirrors
the model definition. It writes monotonically increasing created timestamps to preserve insertion
order for non-similarity queries.
Wiring the index¶
Construct the index with PostgresVectorIndex() and reuse the shared Postgres state inside an
active context scope. The optional mmr_multiplier argument controls how many rows are fetched
before applying Maximal Marginal Relevance re-ranking when rerank=True.
from collections.abc import Sequence
from typing import Annotated
from draive import Alias, DataModel, ctx
from asyncpg.connection import Connection
from pgvector.asyncpg import register_vector
from draive.postgres import PostgresConnectionPool, PostgresVectorIndex
from draive.utils import VectorIndex
class Chunk(DataModel):
identifier: Annotated[str, Alias("id")]
text: str
async def initialize_pgvector(connection: Connection) -> None:
await register_vector(connection, schema="embeddings")
async with ctx.scope(
"pgvector-demo",
PostgresVectorIndex(),
disposables=(
PostgresConnectionPool.of(
dsn="postgresql://draive:secret@localhost:5432/draive",
initialize=initialize_pgvector,
),
),
):
await VectorIndex.index(
Chunk,
values=[Chunk(identifier="doc-1", text="hello world")],
attribute=Chunk._.text,
)
results: Sequence[Chunk] = await VectorIndex.search(
Chunk,
query="hello",
limit=3,
score_threshold=0.6, # optional cosine similarity cutoff
rerank=True,
)
Queries can be strings, TextContent, ResourceContent (text or image), or pre-computed vectors.
When score_threshold is provided the helper converts it to the cosine distance cutoff used by
pgvector. Set rerank=False to return rows ordered solely by the database similarity operator.
Payload filtering and requirements¶
Search and deletion accept AttributeRequirement instances which are evaluated against the stored
payload JSON. Requirements are translated to SQL expressions (for example,
AttributeRequirement.equal becomes payload #>> '{text}' = $2). Unsupported operators raise
NotImplementedError, ensuring the query surface remains explicit.
Putting it together¶
Combine these adapters with higher-level Draive components to centralise operational data in
Postgres. For example, wire the configuration repository into your configuration state, keep
reusable instruction sets shareable across teams, and persist model interactions for analytics—all
while letting haiway manage connection pooling and logging through the active context.