Alembic migrations¶
The package never creates or migrates your schema — that's
Alembic's job. This page shows what
Alembic produces against make_outbox_table() and
make_dlq_table(), and gives recipes for drift detection and DLQ
retention via partition rotation.
Initial migration¶
Add make_outbox_table(metadata, table_name="outbox") to whatever
MetaData your Alembic env.py exposes as target_metadata, then
run alembic revision --autogenerate -m "outbox". The captured
upgrade() body (SQLAlchemy 2.0.50, Alembic 1.18.4, Postgres 17,
against an empty schema):
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('outbox',
sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('attempts_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('deliveries_count', sa.BigInteger(), server_default='0', nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('next_attempt_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('first_attempt_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('last_attempt_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('acquired_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('acquired_token', sa.Uuid(), nullable=True),
sa.Column('timer_id', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index('outbox_lease_idx', 'outbox', ['queue', 'acquired_at'], unique=False,
postgresql_where=sa.text('acquired_token IS NOT NULL'))
op.create_index('outbox_pending_idx', 'outbox', ['queue', 'next_attempt_at'], unique=False,
postgresql_where=sa.text('acquired_token IS NULL'))
op.create_index('outbox_timer_id_uq', 'outbox', ['queue', 'timer_id'], unique=True,
postgresql_where=sa.text('timer_id IS NOT NULL'))
# ### end Alembic commands ###
The three indexes carry load-bearing partial predicates:
outbox_pending_idx—(queue, next_attempt_at) WHERE acquired_token IS NULL. Branch A of the fetch CTE (unleased rows) is served by this index. The WHERE clause is written so Postgres' planner recognizes the implied predicate and uses the partial index; drop the predicate and the planner falls back to a seq-scan as the table grows.outbox_lease_idx—(queue, acquired_at) WHERE acquired_token IS NOT NULL. Branch B of the fetch CTE (expired-lease reclaim) is served by this index. Same story: predicate is load-bearing for fetch performance.outbox_timer_id_uq— unique(queue, timer_id) WHERE timer_id IS NOT NULL. Backs thetimer_iddedup contract viapg_insert(...).on_conflict_do_nothing(...). Without the partial predicate, the unique constraint applies to all rows and breaks non-timer publishes.
Columns the operator can mostly ignore: attempts_count and
deliveries_count (broker book-keeping for "handler invocations" vs
"claims including expired-lease re-claims"), first_attempt_at and
last_attempt_at (debugging aids on retry-heavy rows). All four are
maintained by the broker; no application code touches them.
The # please adjust! comment from Alembic is misleading here —
don't adjust. The column types, predicates, and indexes are
exactly what the broker depends on. The
validate_schema() check — when you wire
it into a /health probe or CI gate — fails when the live DB drifts from
this declaration. (It is opt-in; it never runs at broker.start().)
Adding the DLQ after the fact¶
To opt into Dead-letter queue audit, add
make_dlq_table(metadata, table_name="outbox_dlq") to your
MetaData and run alembic revision --autogenerate -m "outbox-dlq".
The captured upgrade() body:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('outbox_dlq',
sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column('original_id', sa.BigInteger(), nullable=False),
sa.Column('queue', sa.String(length=255), nullable=False),
sa.Column('payload', sa.LargeBinary(), nullable=False),
sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column('deliveries_count', sa.BigInteger(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
sa.Column('failed_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('failure_reason', sa.String(length=64), nullable=False),
sa.Column('last_exception', sa.String(), nullable=True),
sa.Column('timer_id', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index('outbox_dlq_queue_failed_idx', 'outbox_dlq', ['queue', 'failed_at'], unique=False)
# ### end Alembic commands ###
This is purely additive: no op.alter_table against the outbox
table itself, no column add, no constraint flip. The runtime change
that activates the DLQ is the broker's
atomicity CTE (DELETE … RETURNING → INSERT
INTO <dlq>), driven by OutboxBroker(..., dlq_table=…), not by
schema state.
The non-unique outbox_dlq_queue_failed_idx on (queue, failed_at)
supports "show me recent failures for queue X" queries and
double-duties as the pruning index when § DLQ retention via partition
drop converts the table to
partitioned.
Drift detection in CI¶
Run validate_schema() after
alembic upgrade head in your CI pipeline. A small standalone
script:
import asyncio
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from faststream_outbox import OutboxBroker, make_outbox_table
async def main() -> None:
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://...")
broker = OutboxBroker(engine, outbox_table=outbox_table)
await broker.validate_schema()
asyncio.run(main())
Non-zero exit on drift; CI fails before "deploy".
This check is opt-in for /health and not always-on at
broker.start(). The reason: a running migration plus an always-on
validator would race. Operators must be able to roll forward a new
schema version without spinning every pod into a crash loop. The drift
check belongs between alembic upgrade head and the deploy step,
not inside the running service.
If you also pass dlq_table=make_dlq_table(metadata) when
constructing the broker, validate_schema() checks both tables in
one call and surfaces drift on either one.
Both autogenerate and validate_schema() run with
compare_server_default=False, so server-default drift is not
detected — neither the autogenerated migration nor the drift gate will
flag a column whose server_default is missing or wrong. The
consequence that bites is a missing server_default=now() on
next_attempt_at; see the server-defaults caveat in
Schema validation.
Fixing drift autogenerate can't see¶
Two kinds of drift that
validate_schema() reports cannot be
remediated by alembic revision --autogenerate — the same blindness that
let them drift in also stops autogenerate from emitting a fix:
- The
outbox_lease_ckCHECK constraint. Alembic'scompare_metadatahas no check-constraint comparator, so a missing or altered CHECK never appears in an autogenerated migration. - Partial-index predicates. Alembic's index comparator ignores
postgresql_where, so an index that exists but was created non-partial, with the wrongWHERE, or (foroutbox_timer_id_uq) non-unique is invisible to the diff.
When validate_schema() raises for one of these, its error ends with a
pointer to this section. Re-running autogenerate produces an empty
upgrade() — hand-write the migration instead, then re-run
validate_schema() to confirm the drift is cleared.
Restore the lease CHECK¶
# Drop first ONLY if the constraint exists with a wrong predicate; skip the
# drop if it is absent entirely.
op.drop_constraint('outbox_lease_ck', 'outbox', type_='check')
op.create_check_constraint(
'outbox_lease_ck',
'outbox',
'(acquired_token IS NULL) = (acquired_at IS NULL)',
)
Restore a partial index¶
Drop the drifted index and recreate it with its load-bearing predicate. The three indexes and their expected shape:
| Index | Columns | Unique | postgresql_where |
|---|---|---|---|
outbox_pending_idx |
queue, next_attempt_at |
no | acquired_token IS NULL |
outbox_lease_idx |
queue, acquired_at |
no | acquired_token IS NOT NULL |
outbox_timer_id_uq |
queue, timer_id |
yes | timer_id IS NOT NULL |
op.drop_index('outbox_timer_id_uq', table_name='outbox')
op.create_index(
'outbox_timer_id_uq',
'outbox',
['queue', 'timer_id'],
unique=True,
postgresql_where=sa.text('timer_id IS NOT NULL'),
)
Substitute the columns / unique / predicate from the table above for
outbox_pending_idx and outbox_lease_idx.
The recipes pass literal names ('outbox_lease_ck', 'outbox_timer_id_uq') —
the exact names the package emits with no naming_convention.
Naming conventions: the CHECK name doesn't matter¶
validate_schema()'s CHECK probe matches the lease constraint by predicate,
not name. So if your MetaData carries a SQLAlchemy naming_convention with a
ck key, you don't need to match any particular name — create the constraint
under whatever name your migration produces and it will validate, as long as its
predicate is (acquired_token IS NULL) = (acquired_at IS NULL). The literal
op.create_check_constraint('outbox_lease_ck', ...) recipe above is fine even
under a convention.
(Why the probe ignores the name: a ck convention re-templates the in-memory
CheckConstraint.name to e.g. ck_outbox_outbox_lease_ck, but a hand-written
op.create_check_constraint('outbox_lease_ck', ...) creates the literal name
verbatim — Alembic op functions don't apply the convention. The live name is
therefore unpredictable, so the probe keys off the stable predicate instead.)
The index recipes still use literal names, because the explicitly-named
indexes (outbox_pending_idx etc.) are not re-templated by the ix/uq
convention keys — those only rename auto-named indexes.
DLQ retention via partition drop¶
Plain DELETE FROM outbox_dlq WHERE failed_at < now() - interval '90
days' works fine for low-volume DLQs (< ~1 GB / month) and needs no
schema change. For higher volume, converting the DLQ to a
range-partitioned table by failed_at lets you drop entire
partitions instead of deleting row by row — orders-of-magnitude
faster, no vacuum debt.
One-time migration to partitioned DLQ¶
# ### Convert outbox_dlq to range-partitioned by failed_at ###
# Rename the existing table out of the way.
op.rename_table('outbox_dlq', 'outbox_dlq_old')
# Create the partitioned parent. The PRIMARY KEY must include the
# partition key, so we extend it to (id, failed_at).
op.execute("""
CREATE TABLE outbox_dlq (
id BIGSERIAL NOT NULL,
original_id BIGINT NOT NULL,
queue VARCHAR(255) NOT NULL,
payload BYTEA NOT NULL,
headers JSONB,
deliveries_count BIGINT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
failed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
failure_reason VARCHAR(64) NOT NULL,
last_exception TEXT,
timer_id VARCHAR(255),
PRIMARY KEY (id, failed_at)
) PARTITION BY RANGE (failed_at);
""")
op.execute("""
CREATE INDEX outbox_dlq_queue_failed_idx
ON outbox_dlq (queue, failed_at);
""")
# Create initial partitions covering recent + current month.
op.execute("""
CREATE TABLE outbox_dlq_2026_05 PARTITION OF outbox_dlq
FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
CREATE TABLE outbox_dlq_2026_06 PARTITION OF outbox_dlq
FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
""")
# Move surviving rows into the partitioned table; rows older than the
# retention window stay in outbox_dlq_old (or get dropped explicitly).
op.execute("""
INSERT INTO outbox_dlq
SELECT * FROM outbox_dlq_old
WHERE failed_at >= '2026-05-01';
""")
op.drop_table('outbox_dlq_old')
validate_schema() continues to work against the partitioned table —
Alembic's autogenerate ignores partition boundaries when comparing
column shape.
Monthly cron: create next, drop oldest¶
Run from a Postgres cron (or pgAgent, or a sidecar job) once a month. Adjust the retention window to taste:
DO $$
DECLARE
next_month DATE := date_trunc('month', now() + interval '1 month');
next_month_end DATE := next_month + interval '1 month';
drop_month DATE := date_trunc('month', now() - interval '12 months');
next_name TEXT := 'outbox_dlq_' || to_char(next_month, 'YYYY_MM');
drop_name TEXT := 'outbox_dlq_' || to_char(drop_month, 'YYYY_MM');
BEGIN
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %I PARTITION OF outbox_dlq
FOR VALUES FROM (%L) TO (%L);',
next_name, next_month, next_month_end
);
EXECUTE format('DROP TABLE IF EXISTS %I;', drop_name);
END $$;
The pattern is "always have next month's partition before any row in it could land" + "drop everything older than the retention window." Both operations are O(1) regardless of DLQ row count.