Skip to content

Subscriber

Use @broker.subscriber(queue, ...) to register a handler for a queue.

Basic example

from faststream import FastStream
from faststream_outbox import OutboxBroker

broker: OutboxBroker = ...
app = FastStream(broker)


@broker.subscriber("orders")
async def handle(order_id: int) -> None:
    print(f"order {order_id}")

Body types

FastStream deserializes the message body into the annotated type. Any JSON-serializable type works:

from dataclasses import dataclass


@dataclass
class Order:
    order_id: str
    amount: float


@broker.subscriber("orders")
async def handle(body: Order) -> None:
    print(f"order {body.order_id} for {body.amount}")

Annotated handler params

faststream_outbox.annotations exports Annotated[..., Context(...)] shortcuts so handler signatures stay concise:

from faststream_outbox.annotations import OutboxBroker, OutboxMessage


@broker.subscriber("orders")
async def handle(msg: OutboxMessage, broker: OutboxBroker) -> None: ...

OutboxMessage, OutboxBroker, OutboxProducer, and OutboxClient are all available. For FastAPI handlers, import the same names from faststream_outbox.fastapi — they resolve via the same Context() paths but go through FastAPI's dependency resolver so Depends(...) and these shortcuts can be mixed freely.

Subscriber options

Per-subscriber knobs, passed to @broker.subscriber("…", …):

Parameter Default Description
max_workers 1 Concurrent handlers per subscriber
fetch_batch_size 10 Rows claimed per fetch cycle
min_fetch_interval 1.0 s Base poll interval; the floor used when the queue has work
max_fetch_interval 10.0 s Ceiling for the adaptive idle backoff (with jitter)
lease_ttl_seconds 60.0 s How long a claim is valid before another fetch may reclaim it. Must exceed your handler's P99 with margin.
max_deliveries None (unbounded) Total claims (including lease-expiry re-claims) after which the row is dropped without invoking the handler. Defends against handlers that consistently wedge.
ack_policy AckPolicy.NACK_ON_ERROR See Ack policy
retry_strategy ExponentialRetry(...) See Retry strategies
@broker.subscriber(
    "high-priority",
    max_workers=8,
    fetch_batch_size=50,
    min_fetch_interval=0.1,
    max_fetch_interval=1.0,
    lease_ttl_seconds=120.0,
)
async def handle_urgent(body: dict) -> None: ...

The factory in subscriber/factory.py warns or raises on likely-wrong combinations (lease_ttl_seconds <= max_fetch_interval, max_deliveries without retry, min_fetch_interval > max_fetch_interval, etc.).

Slow handlers — dedicated queue

When a handler's tail latency exceeds the subscriber's lease_ttl_seconds, the row's lease expires mid-flight and another fetch reclaims it → duplicate delivery. Don't hike lease_ttl_seconds globally — that delays reclaim of actually stuck rows everywhere. Instead, segregate slow work onto its own subscriber with a longer TTL:

@broker.subscriber("slow_q", lease_ttl_seconds=600)   # 10 minutes
async def heavy_job(msg): ...


@broker.subscriber("fast_q", lease_ttl_seconds=30)
async def quick_job(msg): ...

Pick lease_ttl_seconds strictly greater than that subscriber's P99 handler duration, with margin for clock skew. The tight TTL on the fast queue keeps stuck-row reclaim fast; the tall TTL on the slow queue tolerates outliers without slowing reclaim of genuinely stuck rows elsewhere. Producers route to the appropriate queue at publish time.

Ack policy

The default is AckPolicy.NACK_ON_ERROR: on a handler exception, the retry strategy decides whether to schedule another attempt or terminally drop the row.

Policy Effect
AckPolicy.NACK_ON_ERROR (default) Consult the retry strategy on handler exceptions
AckPolicy.REJECT_ON_ERROR Delete on the first failure (the retry strategy is ignored)
AckPolicy.MANUAL Handler must call await msg.ack() / nack() / reject() itself
AckPolicy.ACK_FIRST Not supported. Passing it raises ValueError at registration

ACK_FIRST would delete the row before the handler runs, so a handler crash silently drops the message — defeating the outbox reliability guarantee. The factory rejects it at registration.

from faststream import AckPolicy


@broker.subscriber("audit", ack_policy=AckPolicy.MANUAL)
async def handle(msg, body: dict) -> None:
    try:
        await write_audit(body)
        await msg.ack()
    except TransientError:
        await msg.nack()    # retry
    except PermanentError:
        await msg.reject()  # terminal delete

Retry strategies

A subscriber with no explicit retry_strategy defaults to ExponentialRetry(initial_delay_seconds=1.0, multiplier=2.0, max_delay_seconds=300.0, max_attempts=10, jitter_factor=0.2). Defaulting to "delete on first error" is the wrong contract for an outbox; users wanting that behavior must explicitly pass NoRetry().

from faststream_outbox import ExponentialRetry, ConstantRetry, LinearRetry, NoRetry


@broker.subscriber(
    "orders",
    retry_strategy=ExponentialRetry(
        initial_delay_seconds=1.0,
        max_delay_seconds=300.0,
        max_attempts=5,
        jitter_factor=0.5,
    ),
)
async def handle(order_id: int) -> None: ...


@broker.subscriber("audit", retry_strategy=NoRetry())  # opt out of retries
async def handle_audit(payload: dict) -> None: ...

ConstantRetry and LinearRetry accept jitter_factor (default 0.0); when non-zero, the computed delay is multiplied by 1 + U(-jitter_factor/2, +jitter_factor/2) to spread out retries, matching ExponentialRetry's shape.

Retry only on transient errors

Strategies receive the raised exception so users may subclass for "retry only on transient errors":

class TransientOnly(ExponentialRetry):
    def get_next_attempt_at(self, *, exception=None, **kw):
        if exception and not isinstance(exception, TransientError):
            return None  # terminal — DELETE
        return super().get_next_attempt_at(exception=exception, **kw)

Returning None from get_next_attempt_at signals a terminal failure. _RetryStrategyTemplate also enforces max_attempts and max_total_delay_seconds for you.

Connection budget

Each subscriber holds max_workers + 1 long-lived SQLAlchemy pool connections (one writer per worker + one fetch), plus one raw asyncpg connection for LISTEN when available. Size your engine for Σ subscribers × (max_workers + 1) or broker.start() will block on pool checkout. SQLAlchemy's default pool_size=5, max_overflow=10 covers a handful of single-worker subscribers; raise it for larger fleets.

The formula is per process. Each replica opens its own pool, so your Postgres max_connections needs to cover replicas × Σ subscribers × (max_workers + 1) — otherwise additional replicas (or rolling deployments) will be refused at startup with FATAL: too many connections.

Read-only inspection

subscriber.get_one() and async for msg in subscriber: are not supported on OutboxSubscriber — they would acquire a lease and bump deliveries_count, surprising semantics for a peek API. Use broker.fetch_unprocessed(session=..., queue=...) for lease-free reads of the current table state.