Tutorial: Your first outbox app¶
What you'll build¶
A tiny app where calling broker.publish inside a database transaction
triggers a handler — no message bus required, just Postgres. By the end
you will have run a single message end-to-end and seen the handler
print it.
Before you start¶
- Python 3.13+
- Docker (for a one-line Postgres container)
- uv for project setup
- Roughly ten minutes
Step 1: Install¶
Start a fresh project directory and add the package. The asyncpg
extra brings the async Postgres driver; validate is the Alembic-based
schema check helper; cli is FastStream's faststream run command.
mkdir outbox-tutorial && cd outbox-tutorial
uv init
uv add 'faststream-outbox[asyncpg,validate]' 'faststream[cli]'
You should see:
Initialized project `outbox-tutorial`
Using CPython 3.14.4
Creating virtual environment at: .venv
Resolved 26 packages in 61ms
Installed 24 packages in 37ms
+ alembic==1.18.4
+ annotated-types==0.7.0
+ anyio==4.13.0
+ asyncpg==0.31.0
+ click==8.4.1
+ fast-depends==3.0.8
+ faststream==0.7.1
+ faststream-outbox==0.8.0
+ greenlet==3.5.1
+ idna==3.18
+ mako==1.3.12
+ markdown-it-py==4.2.0
+ markupsafe==3.0.3
+ mdurl==0.1.2
+ pydantic==2.13.4
+ pydantic-core==2.46.4
+ pygments==2.20.0
+ rich==15.0.0
+ shellingham==1.5.4
+ sqlalchemy==2.0.50
+ typer==0.21.1
+ typing-extensions==4.15.0
+ typing-inspection==0.4.2
+ watchfiles==1.1.1
Your exact pinned versions will differ; that is fine. The Python version
line will reflect whatever uv resolves on your machine — 3.13 or 3.14
are both fine.
Step 2: Start Postgres¶
Run a disposable Postgres 17 container with the credentials we'll wire into the connection string in Step 3.
docker run --rm -d --name outbox-postgres \
-e POSTGRES_USER=outbox -e POSTGRES_PASSWORD=outbox -e POSTGRES_DB=outbox \
-p 5432:5432 postgres:17
You should see a container ID printed:
Give it a couple of seconds and confirm it's ready:
You should see:
Step 3: Declare the outbox table¶
Create app.py. This sets up the SQLAlchemy MetaData, declares the
outbox table on it, builds an async engine, and wires the broker and
FastStream app. We'll add the handler in Step 5.
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
make_outbox_table returns a sqlalchemy.Table attached to your
MetaData. The package never creates or migrates the schema on its
own — Step 4 is where we run that.
Step 4: Create the schema¶
Create a second file, create_schema.py, that runs metadata.create_all
once. Real apps use Alembic; for a tutorial a one-shot script is the
honest shape.
import asyncio
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from faststream_outbox import make_outbox_table
metadata = MetaData()
make_outbox_table(metadata, table_name="outbox")
async def main() -> None:
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
async with engine.begin() as conn:
await conn.run_sync(metadata.create_all)
await engine.dispose()
asyncio.run(main())
Real projects import metadata and outbox_table from a shared module
rather than redeclaring them here; this script is self-contained for the
tutorial's narrow scope.
Run it:
You should see no output — that is success.
Verify the table landed:
You should see:
Table "public.outbox"
Column | Type | Collation | Nullable | Default
------------------+--------------------------+-----------+----------+------------------------------------
id | bigint | | not null | nextval('outbox_id_seq'::regclass)
queue | character varying(255) | | not null |
payload | bytea | | not null |
headers | jsonb | | |
attempts_count | bigint | | not null | '0'::bigint
deliveries_count | bigint | | not null | '0'::bigint
created_at | timestamp with time zone | | not null | now()
next_attempt_at | timestamp with time zone | | not null | now()
first_attempt_at | timestamp with time zone | | |
last_attempt_at | timestamp with time zone | | |
acquired_at | timestamp with time zone | | |
acquired_token | uuid | | |
timer_id | character varying(255) | | |
Indexes:
"outbox_pkey" PRIMARY KEY, btree (id)
"outbox_lease_idx" btree (queue, acquired_at) WHERE acquired_token IS NOT NULL
"outbox_pending_idx" btree (queue, next_attempt_at) WHERE acquired_token IS NULL
"outbox_timer_id_uq" UNIQUE, btree (queue, timer_id) WHERE timer_id IS NOT NULL
Check constraints:
"outbox_lease_ck" CHECK ((acquired_token IS NULL) = (acquired_at IS NULL))
Three partial indexes show up alongside the columns, plus a check constraint that keeps a lease either fully set or fully unset — the broker uses both at runtime; you don't need to think about them.
Step 5: Define a handler¶
Add a subscriber to the bottom of app.py. The handler will run once
per row published to the orders queue.
@broker.subscriber("orders")
async def handle(order_id: int) -> None:
print(f"got order {order_id}")
No command yet — the handler runs once we publish a row and start the app.
Step 6: Publish a row¶
Add an @app.after_startup hook to the bottom of app.py that publishes
one row right after the app boots. broker.publish inserts an outbox
row through the session you give it — the row commits with the
surrounding transaction. There is no separate "send" step; the commit
is the send.
@app.after_startup
async def publish_one() -> None:
async with session_factory() as session, session.begin():
await broker.publish(1, queue="orders", session=session)
The full app.py now reads:
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from faststream import FastStream
from faststream_outbox import OutboxBroker, make_outbox_table
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(engine, outbox_table=outbox_table)
app = FastStream(broker)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
@broker.subscriber("orders")
async def handle(order_id: int) -> None:
print(f"got order {order_id}")
@app.after_startup
async def publish_one() -> None:
async with session_factory() as session, session.begin():
await broker.publish(1, queue="orders", session=session)
Step 7: Run it¶
Start the app:
You should see:
2026-06-12 08:07:06,179 INFO - FastStream app starting...
2026-06-12 08:07:06,179 INFO - orders | - `Handle` waiting for messages
2026-06-12 08:07:06,276 INFO - FastStream app started successfully! To exit, press CTRL+C
2026-06-12 08:07:06,283 INFO - orders | - Received
got order 1
2026-06-12 08:07:06,283 INFO - orders | - Processed
That got order 1 line is your handler firing. The row was inserted by
the @app.after_startup hook, the subscriber's fetch loop picked it up,
dispatched it, the handler ran, and the row was deleted.
Press Ctrl-C:
2026-06-12 08:07:11,989 INFO - FastStream app shutting down...
2026-06-12 08:07:11,990 INFO - FastStream app shut down gracefully.
2026-06-12 08:07:11,990 INFO - | - callback for Task-2 is being executed...
2026-06-12 08:07:11,990 INFO - | - callback for Task-3 is being executed...
What you just built¶
- An outbox table inside your own Postgres database, owned by your schema.
- A FastStream app whose "transport" is rows in that table — no external broker.
- A handler that ran exactly once, in-process, against a row committed by your own session.
The interesting property is what happened inside publish_one: the
broker.publish call inserted a row into the outbox table through the
session you opened. session.begin() committed it. If that commit had
rolled back — say, because a domain write on the same session
failed — the outbox row would have rolled back with it. There is no
universe where the row exists but the domain write doesn't, or vice
versa. That atomicity is the whole point.
Clean up¶
What's next¶
- Subscriber reference — tuning, worker counts, retry strategies.
- Publisher reference —
publish_batch, theOutboxPublisherdecorator, chained publishing. - FastAPI integration — wire the outbox into
a real HTTP service with
Depends(get_session). - Schema validation — this tutorial
installed the
validateextra; callvalidate_schema()from a startup hook or/healthcheck to catch a table that drifted from what the broker expects (e.g. a missing partial index after a migration). - Tutorial: Add a Kafka relay — extend this app to forward each row into Kafka with one stacked decorator.