Skip to content

Timers

Schedule an outbox row to fire later by passing activate_in (relative) or activate_at (absolute, tz-aware) — exactly one. Pass timer_id to deduplicate per (queue, timer_id); cancel a not-yet-leased timer with broker.cancel_timer(...).

Scheduling

import datetime as dt


# Fire 30 seconds from now, deduplicated by timer_id:
await broker.publish(
    {"order_id": 1},
    queue="orders",
    session=session,
    activate_in=dt.timedelta(seconds=30),
    timer_id=f"order-confirm-{order.id}",
)

# Fire at a specific UTC instant:
await broker.publish(
    {"x": 1}, queue="orders", session=session,
    activate_at=dt.datetime(2026, 6, 1, 9, tzinfo=dt.UTC),
)

publish returns the inserted row's id, or None if a row with the same (queue, timer_id) already exists.

Mutually exclusive

Passing both activate_in and activate_at raises ValueError. They are two ways to say the same thing — "make this row invisible to fetch until the given moment".

Timezone-aware activate_at

activate_at must be timezone-aware. A naive datetime raises an explicit ValueError rather than guessing your intended zone.

Server-side vs client-side scheduling

For publish, next_attempt_at is computed server-side via now() + make_interval(secs => :s) to stay clock-skew-safe. For publish_batch, it's client-side (datetime.now(UTC) + activate_in) because executemany doesn't compose cleanly with column-level SQL expressions, and the few-ms drift is harmless for user-supplied scheduling.

Deduplication with timer_id

timer_id flows into a String(255) column with a partial unique index on (queue, timer_id) WHERE timer_id IS NOT NULL. The producer switches to pg_insert(...).on_conflict_do_nothing(...) so re-publishing the same id is a silent no-op (returns None):

first = await broker.publish(
    {"order_id": 1}, queue="orders", session=session,
    activate_in=dt.timedelta(seconds=30),
    timer_id="order-confirm-1",
)
assert first is not None

# Re-publish — no row inserted, no NOTIFY emitted, returns None.
second = await broker.publish(
    {"order_id": 1}, queue="orders", session=session,
    activate_in=dt.timedelta(seconds=30),
    timer_id="order-confirm-1",
)
assert second is None

NOTIFY is skipped when activate_in / activate_at is set OR the conflict path returned no row — both cases would either wake listeners that find nothing, or wake them prematurely.

timer_id is only available on single publish, not on publish_batch (per-row dedup makes no sense for a batch).

Cancellation

broker.cancel_timer(*, queue, timer_id, session) issues a DELETE on the caller's session, but only if the row is not yet leased:

deleted = await broker.cancel_timer(
    queue="orders",
    timer_id="order-confirm-42",
    session=session,
)
# True if a row was deleted; False if it didn't exist or was already in flight.

The underlying SQL is DELETE WHERE queue=? AND timer_id=? AND acquired_token IS NULL. The acquired_token IS NULL guard is load-bearing: it preserves the lease-token invariant by refusing to clobber a row whose handler is already running. If the timer fired in the race window between your application logic deciding to cancel and the DELETE landing, the delivery completes normally and cancel_timer returns False.

Latency floor

Timer firing latency is bounded by the subscriber's max_fetch_interval (default 10 seconds) after next_attempt_at elapses. NOTIFY does not help here — listeners can't act on a future row, so the fetch loop has to poll for it.

Lower max_fetch_interval for sub-10s precision. Sub-second precision is not a goal of this broker; if you need it, schedule the row early and sleep inside the handler, or use a different scheduler.

Test broker note

In tests using TestOutboxBroker (default run_loops=False mode), activate_in / activate_at are ignored and timers fire immediately — sync dispatch ignores next_attempt_at. This trades production parity for test ergonomics: tests can assert handler effects without time travel.

The schedule is still recorded on the fake row (broker.fake_client.rows[0].next_attempt_at) if a test needs to assert on it. Pass run_loops=True if you need scheduled delivery to actually wait. See Testing.