Skip to content

Alembic migrations

The package never creates or migrates your schema — that's Alembic's job. This page shows what Alembic produces against make_outbox_table() and make_dlq_table(), and gives recipes for drift detection and DLQ retention via partition rotation.

Initial migration

Add make_outbox_table(metadata, table_name="outbox") to whatever MetaData your Alembic env.py exposes as target_metadata, then run alembic revision --autogenerate -m "outbox". The captured upgrade() body (SQLAlchemy 2.0.50, Alembic 1.18.4, Postgres 17, against an empty schema):

# ### commands auto generated by Alembic - please adjust! ###
op.create_table('outbox',
    sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
    sa.Column('queue', sa.String(length=255), nullable=False),
    sa.Column('payload', sa.LargeBinary(), nullable=False),
    sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
    sa.Column('attempts_count', sa.BigInteger(), server_default='0', nullable=False),
    sa.Column('deliveries_count', sa.BigInteger(), server_default='0', nullable=False),
    sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
    sa.Column('next_attempt_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
    sa.Column('first_attempt_at', sa.DateTime(timezone=True), nullable=True),
    sa.Column('last_attempt_at', sa.DateTime(timezone=True), nullable=True),
    sa.Column('acquired_at', sa.DateTime(timezone=True), nullable=True),
    sa.Column('acquired_token', sa.Uuid(), nullable=True),
    sa.Column('timer_id', sa.String(length=255), nullable=True),
    sa.PrimaryKeyConstraint('id')
)
op.create_index('outbox_lease_idx', 'outbox', ['queue', 'acquired_at'], unique=False,
                postgresql_where=sa.text('acquired_token IS NOT NULL'))
op.create_index('outbox_pending_idx', 'outbox', ['queue', 'next_attempt_at'], unique=False,
                postgresql_where=sa.text('acquired_token IS NULL'))
op.create_index('outbox_timer_id_uq', 'outbox', ['queue', 'timer_id'], unique=True,
                postgresql_where=sa.text('timer_id IS NOT NULL'))
# ### end Alembic commands ###

The three indexes carry load-bearing partial predicates:

  • outbox_pending_idx(queue, next_attempt_at) WHERE acquired_token IS NULL. Branch A of the fetch CTE (unleased rows) is served by this index. The WHERE clause is written so Postgres' planner recognizes the implied predicate and uses the partial index; drop the predicate and the planner falls back to a seq-scan as the table grows.
  • outbox_lease_idx(queue, acquired_at) WHERE acquired_token IS NOT NULL. Branch B of the fetch CTE (expired-lease reclaim) is served by this index. Same story: predicate is load-bearing for fetch performance.
  • outbox_timer_id_uq — unique (queue, timer_id) WHERE timer_id IS NOT NULL. Backs the timer_id dedup contract via pg_insert(...).on_conflict_do_nothing(...). Without the partial predicate, the unique constraint applies to all rows and breaks non-timer publishes.

Columns the operator can mostly ignore: attempts_count and deliveries_count (broker book-keeping for "handler invocations" vs "claims including expired-lease re-claims"), first_attempt_at and last_attempt_at (debugging aids on retry-heavy rows). All four are maintained by the broker; no application code touches them.

The # please adjust! comment from Alembic is misleading here — don't adjust. The column types, predicates, and indexes are exactly what the broker depends on. The validate_schema() check — when you wire it into a /health probe or CI gate — fails when the live DB drifts from this declaration. (It is opt-in; it never runs at broker.start().)

Adding the DLQ after the fact

To opt into Dead-letter queue audit, add make_dlq_table(metadata, table_name="outbox_dlq") to your MetaData and run alembic revision --autogenerate -m "outbox-dlq". The captured upgrade() body:

# ### commands auto generated by Alembic - please adjust! ###
op.create_table('outbox_dlq',
    sa.Column('id', sa.BigInteger(), autoincrement=True, nullable=False),
    sa.Column('original_id', sa.BigInteger(), nullable=False),
    sa.Column('queue', sa.String(length=255), nullable=False),
    sa.Column('payload', sa.LargeBinary(), nullable=False),
    sa.Column('headers', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
    sa.Column('deliveries_count', sa.BigInteger(), nullable=False),
    sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
    sa.Column('failed_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
    sa.Column('failure_reason', sa.String(length=64), nullable=False),
    sa.Column('last_exception', sa.String(), nullable=True),
    sa.Column('timer_id', sa.String(length=255), nullable=True),
    sa.PrimaryKeyConstraint('id')
)
op.create_index('outbox_dlq_queue_failed_idx', 'outbox_dlq', ['queue', 'failed_at'], unique=False)
# ### end Alembic commands ###

This is purely additive: no op.alter_table against the outbox table itself, no column add, no constraint flip. The runtime change that activates the DLQ is the broker's atomicity CTE (DELETE … RETURNING → INSERT INTO <dlq>), driven by OutboxBroker(..., dlq_table=…), not by schema state.

The non-unique outbox_dlq_queue_failed_idx on (queue, failed_at) supports "show me recent failures for queue X" queries and double-duties as the pruning index when § DLQ retention via partition drop converts the table to partitioned.

Drift detection in CI

Run validate_schema() after alembic upgrade head in your CI pipeline. A small standalone script:

import asyncio

from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine

from faststream_outbox import OutboxBroker, make_outbox_table


async def main() -> None:
    metadata = MetaData()
    outbox_table = make_outbox_table(metadata, table_name="outbox")
    engine = create_async_engine("postgresql+asyncpg://...")
    broker = OutboxBroker(engine, outbox_table=outbox_table)
    await broker.validate_schema()


asyncio.run(main())

Non-zero exit on drift; CI fails before "deploy".

This check is opt-in for /health and not always-on at broker.start(). The reason: a running migration plus an always-on validator would race. Operators must be able to roll forward a new schema version without spinning every pod into a crash loop. The drift check belongs between alembic upgrade head and the deploy step, not inside the running service.

If you also pass dlq_table=make_dlq_table(metadata) when constructing the broker, validate_schema() checks both tables in one call and surfaces drift on either one.

Both autogenerate and validate_schema() run with compare_server_default=False, so server-default drift is not detected — neither the autogenerated migration nor the drift gate will flag a column whose server_default is missing or wrong. The consequence that bites is a missing server_default=now() on next_attempt_at; see the server-defaults caveat in Schema validation.

Fixing drift autogenerate can't see

Two kinds of drift that validate_schema() reports cannot be remediated by alembic revision --autogenerate — the same blindness that let them drift in also stops autogenerate from emitting a fix:

  • The outbox_lease_ck CHECK constraint. Alembic's compare_metadata has no check-constraint comparator, so a missing or altered CHECK never appears in an autogenerated migration.
  • Partial-index predicates. Alembic's index comparator ignores postgresql_where, so an index that exists but was created non-partial, with the wrong WHERE, or (for outbox_timer_id_uq) non-unique is invisible to the diff.

When validate_schema() raises for one of these, its error ends with a pointer to this section. Re-running autogenerate produces an empty upgrade() — hand-write the migration instead, then re-run validate_schema() to confirm the drift is cleared.

Restore the lease CHECK

# Drop first ONLY if the constraint exists with a wrong predicate; skip the
# drop if it is absent entirely.
op.drop_constraint('outbox_lease_ck', 'outbox', type_='check')
op.create_check_constraint(
    'outbox_lease_ck',
    'outbox',
    '(acquired_token IS NULL) = (acquired_at IS NULL)',
)

Restore a partial index

Drop the drifted index and recreate it with its load-bearing predicate. The three indexes and their expected shape:

Index Columns Unique postgresql_where
outbox_pending_idx queue, next_attempt_at no acquired_token IS NULL
outbox_lease_idx queue, acquired_at no acquired_token IS NOT NULL
outbox_timer_id_uq queue, timer_id yes timer_id IS NOT NULL
op.drop_index('outbox_timer_id_uq', table_name='outbox')
op.create_index(
    'outbox_timer_id_uq',
    'outbox',
    ['queue', 'timer_id'],
    unique=True,
    postgresql_where=sa.text('timer_id IS NOT NULL'),
)

Substitute the columns / unique / predicate from the table above for outbox_pending_idx and outbox_lease_idx.

The recipes pass literal names ('outbox_lease_ck', 'outbox_timer_id_uq') — the exact names the package emits with no naming_convention.

Naming conventions: the CHECK name doesn't matter

validate_schema()'s CHECK probe matches the lease constraint by predicate, not name. So if your MetaData carries a SQLAlchemy naming_convention with a ck key, you don't need to match any particular name — create the constraint under whatever name your migration produces and it will validate, as long as its predicate is (acquired_token IS NULL) = (acquired_at IS NULL). The literal op.create_check_constraint('outbox_lease_ck', ...) recipe above is fine even under a convention.

(Why the probe ignores the name: a ck convention re-templates the in-memory CheckConstraint.name to e.g. ck_outbox_outbox_lease_ck, but a hand-written op.create_check_constraint('outbox_lease_ck', ...) creates the literal name verbatim — Alembic op functions don't apply the convention. The live name is therefore unpredictable, so the probe keys off the stable predicate instead.)

The index recipes still use literal names, because the explicitly-named indexes (outbox_pending_idx etc.) are not re-templated by the ix/uq convention keys — those only rename auto-named indexes.

DLQ retention via partition drop

Plain DELETE FROM outbox_dlq WHERE failed_at < now() - interval '90 days' works fine for low-volume DLQs (< ~1 GB / month) and needs no schema change. For higher volume, converting the DLQ to a range-partitioned table by failed_at lets you drop entire partitions instead of deleting row by row — orders-of-magnitude faster, no vacuum debt.

One-time migration to partitioned DLQ

# ### Convert outbox_dlq to range-partitioned by failed_at ###

# Rename the existing table out of the way.
op.rename_table('outbox_dlq', 'outbox_dlq_old')

# Create the partitioned parent. The PRIMARY KEY must include the
# partition key, so we extend it to (id, failed_at).
op.execute("""
    CREATE TABLE outbox_dlq (
        id              BIGSERIAL NOT NULL,
        original_id     BIGINT NOT NULL,
        queue           VARCHAR(255) NOT NULL,
        payload         BYTEA NOT NULL,
        headers         JSONB,
        deliveries_count BIGINT NOT NULL,
        created_at      TIMESTAMPTZ NOT NULL,
        failed_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
        failure_reason  VARCHAR(64) NOT NULL,
        last_exception  TEXT,
        timer_id        VARCHAR(255),
        PRIMARY KEY (id, failed_at)
    ) PARTITION BY RANGE (failed_at);
""")

op.execute("""
    CREATE INDEX outbox_dlq_queue_failed_idx
        ON outbox_dlq (queue, failed_at);
""")

# Create initial partitions covering recent + current month.
op.execute("""
    CREATE TABLE outbox_dlq_2026_05 PARTITION OF outbox_dlq
        FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');
    CREATE TABLE outbox_dlq_2026_06 PARTITION OF outbox_dlq
        FOR VALUES FROM ('2026-06-01') TO ('2026-07-01');
""")

# Move surviving rows into the partitioned table; rows older than the
# retention window stay in outbox_dlq_old (or get dropped explicitly).
op.execute("""
    INSERT INTO outbox_dlq
        SELECT * FROM outbox_dlq_old
        WHERE failed_at >= '2026-05-01';
""")

op.drop_table('outbox_dlq_old')

validate_schema() continues to work against the partitioned table — Alembic's autogenerate ignores partition boundaries when comparing column shape.

Monthly cron: create next, drop oldest

Run from a Postgres cron (or pgAgent, or a sidecar job) once a month. Adjust the retention window to taste:

DO $$
DECLARE
    next_month      DATE := date_trunc('month', now() + interval '1 month');
    next_month_end  DATE := next_month + interval '1 month';
    drop_month      DATE := date_trunc('month', now() - interval '12 months');
    next_name       TEXT := 'outbox_dlq_' || to_char(next_month, 'YYYY_MM');
    drop_name       TEXT := 'outbox_dlq_' || to_char(drop_month, 'YYYY_MM');
BEGIN
    EXECUTE format(
        'CREATE TABLE IF NOT EXISTS %I PARTITION OF outbox_dlq
            FOR VALUES FROM (%L) TO (%L);',
        next_name, next_month, next_month_end
    );
    EXECUTE format('DROP TABLE IF EXISTS %I;', drop_name);
END $$;

The pattern is "always have next month's partition before any row in it could land" + "drop everything older than the retention window." Both operations are O(1) regardless of DLQ row count.