Skip to content

Basic usage

1. Declare the outbox table

The package never creates or migrates your schema — that's Alembic's job. make_outbox_table(metadata, table_name="outbox") returns a sqlalchemy.Table you attach to your own MetaData:

from sqlalchemy import MetaData
from faststream_outbox import make_outbox_table

metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")

The returned Table carries three indexes the broker needs at runtime — a partial index for the fetch CTE's unleased branch, a partial index for the expired-lease reclaim branch, and a partial unique index for timer_id deduplication. Alembic autogenerate picks them up alongside the table itself.

2. Create the broker and app

from sqlalchemy.ext.asyncio import create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker

engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)

3. Register a subscriber

Subscribers work like any FastStream subscriber. Decorate a handler with @broker.subscriber(queue, ...):

@broker.subscriber("orders", max_workers=4)
async def handle(order_id: int) -> None:
    print(f"order {order_id}")

See Subscriber for the full options list, tuning guide, and retry strategies.

4. Publish a message

broker.publish(body, *, queue, session, ...) inserts an outbox row through the caller's AsyncSession. It does not flush, commit, or open its own transaction — the row commits with the caller's domain writes:

from sqlalchemy.ext.asyncio import async_sessionmaker

session_factory = async_sessionmaker(engine, expire_on_commit=False)

async with session_factory() as session, session.begin():
    session.add(Order(id=1))
    await broker.publish(1, queue="orders", session=session)
    # session.begin() commits both atomically on exit

Passing anything that is not an AsyncSession raises TypeError. The whole point of the outbox pattern is that the row commits atomically with your domain writes; opening a separate session would defeat it.

See Publisher for publish_batch, the OutboxPublisher decorator, and chained publishing via OutboxResponse.

Full quickstart

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table

metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")

engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)


@broker.subscriber("orders", max_workers=4)
async def handle(order_id: int) -> None:
    print(f"order {order_id}")


session_factory = async_sessionmaker(engine, expire_on_commit=False)


@app.after_startup
async def publish_one() -> None:
    async with session_factory() as session, session.begin():
        await broker.publish(1, queue="orders", session=session)

Run with faststream run app:app.

Connection ownership

OutboxBroker does not close the AsyncEngine you pass in — the caller owns its lifecycle. The same engine can be shared with other SQLAlchemy users (your FastAPI app, an Alembic upgrade, etc.); closing it from the broker would surprise them. Manage the engine with try/finally or — when running under FastAPI — let the framework's lifespan handle it (see FastAPI integration).

  • How it works — architecture, lease invariant, at-least-once semantics
  • Subscriber — tuning, retry strategies, slow-handler queue segregation
  • Publisherpublish_batch, OutboxPublisher, chained publishing
  • FastAPI integrationOutboxRouter, Depends(get_session) pattern