Dead-letter queue¶
Opt-in audit for terminal failures. Pass dlq_table=make_dlq_table(metadata)
to the broker and every row that fails terminally is copied into the DLQ in
the same Postgres statement as the outbox DELETE. Default behavior is
unchanged when dlq_table is omitted — no audit table, no new code paths.
Quickstart¶
Build the DLQ Table on the same MetaData as your outbox table so
Alembic discovers both, then wire it into the broker:
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from faststream_outbox import OutboxBroker, make_dlq_table, make_outbox_table
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
dlq_table = make_dlq_table(metadata, table_name="outbox_dlq")
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(engine, outbox_table=outbox_table, dlq_table=dlq_table)
The package does not create or migrate the table — run metadata.create_all
(or your Alembic migration) once both tables are declared. Subscribers and
publishers need no further configuration; the broker reads dlq_table from
its own config when it builds the terminal-flush SQL.
What gets archived¶
A row lands in the DLQ when it is terminal-by-failure, i.e. the
subscriber's terminal flush would otherwise DELETE the row because of a
failure (not a clean ack). Three paths produce that:
failure_reason |
Trigger |
|---|---|
max_deliveries |
deliveries_count > max_deliveries — handler is never invoked for this attempt. |
retry_terminal |
Handler raised; the retry strategy returned None (attempts / total-delay exhausted, or NoRetry()). |
rejected |
Handler called await msg.reject() directly, or AckPolicy.REJECT_ON_ERROR rejected the row on an exception. |
Successful rows (ack) are never archived; the success path stays a plain
DELETE and never touches the DLQ. See
Ack policy and
Retry strategies for the upstream
behaviors that decide which reason fires.
Schema reference¶
make_dlq_table(metadata, table_name="outbox_dlq") declares:
| Column | Type | Notes |
|---|---|---|
id |
BigInteger, PK, autoincrement |
DLQ row identity. |
original_id |
BigInteger, not null |
The outbox row's id, for operator forensics. Not unique — a re-delivered timer_id row could legitimately land here twice. |
queue |
String(255), not null |
Source queue name. |
payload |
LargeBinary, not null |
Verbatim copy of the outbox payload bytes. |
headers |
JSONB, nullable |
Verbatim copy, including the inherited correlation_id. |
deliveries_count |
BigInteger, not null |
Attempt count at the moment of failure. |
created_at |
DateTime(timezone=True), not null |
The outbox row's original created_at — measures time-to-terminal-failure. |
failed_at |
DateTime(timezone=True), not null, default now() |
When the audit row was written. |
failure_reason |
String(64), not null |
One of the three values in the table above. |
last_exception |
String, nullable |
repr() of the raised exception, bounded at 8 KiB (see below). None on manual reject() without an exception. |
Index: (queue, failed_at) (btree, non-unique) — supports "show me recent
failures for queue X" queries without a sequential scan as the DLQ grows.
No foreign key references the outbox table: the source row is gone in the
same transaction, so the constraint would be unsatisfiable. There is also no
LISTEN/NOTIFY channel — nobody polls the DLQ.
Atomicity¶
When dlq_table is configured, OutboxClient.delete_with_lease switches to
a single CTE statement:
WITH deleted AS (
DELETE FROM <outbox> WHERE id = :id AND acquired_token = :token
RETURNING id, queue, payload, headers, deliveries_count, created_at
)
INSERT INTO <dlq> (original_id, queue, payload, headers, deliveries_count,
created_at, failure_reason, last_exception)
SELECT id, queue, payload, headers, deliveries_count, created_at,
:failure_reason, :last_exception
FROM deleted;
Two operator-visible properties fall out of this shape:
- Lease-lost is a transparent no-op. If another worker reclaimed the
row after a lease expiry,
WHERE acquired_token = :tokenmatches nothing,deletedis empty, the INSERT inserts zero rows, and the caller seesrowcount == 0— same observable as the no-DLQ path. The lease-token guard documented in Subscriber is preserved. - DLQ-write failure rolls back the DELETE. If the INSERT fails
(column mismatch, disk full, ENUM violation), the whole statement
rolls back. The outbox row stays leased and is reclaimed when the
lease expires. Misconfiguration surfaces as outbox growth plus
lease_lostspikes rather than silent audit loss.
The statement runs on the worker's autocommit writer connection — one round-trip per terminal flush, same cost as the no-DLQ path.
last_exception truncation¶
The serialized exception (repr(exc)) is bounded at 8 KiB by
_LAST_EXCEPTION_MAX_CHARS in faststream_outbox/subscriber/usecase.py.
Anything longer is truncated and …[truncated] appended.
Rationale: some exceptions carry MB-scale payloads — pydantic validation
errors with the rejected request body, asyncpg DataError with the full
row, etc. An unbounded repr would extend the writer round-trip on a
poison row by hundreds of milliseconds and bloat the DLQ table. 8 KiB
preserves the traceback and any structured detail while bounding worst
case.
Schema validation¶
When dlq_table is set, await broker.validate_schema() checks both
tables and surfaces missing columns / indexes on either one. The DLQ
table is validated independently — drift in one table does not mask drift
in the other. See Schema validation for the
opt-in install + /health pattern.
Metric: dlq_written¶
_flush_terminal emits a dlq_written recorder event after the CTE
commits successfully. Skipped on the lease-lost path (no audit row was
written, so nothing to count).
Tags:
| Tag | Notes |
|---|---|
queue |
Source queue. |
subscriber |
Subscriber handler name (call_name). |
deliveries_count |
Attempt count at terminal flush. |
failure_reason |
Same value set as the schema column. |
exception_type |
Present only when last_exception was set (omitted for max_deliveries and manual reject() without an exception). |
The bundled adapters surface the event without further wiring:
- Prometheus: counter
faststream_outbox_dlq_written_total{reason}. - OpenTelemetry: counter
messaging.outbox.dlq_writtenwith themessaging.outbox.dlq_reasonattribute and the standarderror.typeattribute when present.
Pair with nacked_terminal to alert on DLQ misconfiguration: every
terminal-failure row should produce one nacked_terminal and one
dlq_written. A persistent divergence (terminal rate > DLQ rate) means
either the CTE keeps rolling back (DLQ schema drift) or the lease keeps
expiring before flush (lease_ttl_seconds too low for handler P99) —
both are operator-actionable signals. See
Observability for the broader recorder + middleware
story.
Retention¶
There is no built-in pruning. Operators are responsible for archival or expiry.
Recommended pattern: partition the DLQ by failed_at (monthly or
weekly) and drop old partitions via a cron job. The (queue, failed_at)
index already supports partition pruning in operator queries; convert it
to a partitioned table at create time if you expect a steady DLQ
inflow.
For low-volume DLQs a plain DELETE FROM <dlq> WHERE failed_at < now() -
interval '90 days' from a daily cron is enough.
Test broker¶
TestOutboxBroker accumulates audit rows in
broker.fake_client.dlq_rows so tests can assert on archive content
without a real Postgres. The fake mirrors the production CTE
side-effect: the source row is removed from fake_client.rows and an
audit dict is appended to fake_client.dlq_rows in the same call.
from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_dlq_table, make_outbox_table
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
dlq_table = make_dlq_table(metadata, table_name="outbox_dlq")
broker = OutboxBroker(outbox_table=outbox_table, dlq_table=dlq_table)
@broker.subscriber("orders", retry_strategy=NoRetry())
async def handle(body: dict) -> None:
raise RuntimeError("boom")
test_broker = TestOutboxBroker(broker)
async with test_broker:
await broker.publish({"order_id": 1}, queue="orders")
assert test_broker.fake_client.rows == []
assert len(test_broker.fake_client.dlq_rows) == 1
assert test_broker.fake_client.dlq_rows[0]["failure_reason"] == "retry_terminal"
See Testing for the broader test-broker contract.