Setup Prometheus and OpenTelemetry¶
You've decided to wire metrics. This page is the recipe. For the why two instrumentation seams, see Concepts § Instrumentation seams; for the event catalog and operator PromQL playbook, see Reference § Observability.
Prometheus adapter¶
Drop-in compatible with FastStream's PrometheusMiddleware. Metric
names, label set, status enum, histogram buckets, and constructor args
all mirror upstream.
# app.py — run with `uvicorn app:app --host 0.0.0.0 --port 8000`
from faststream.asgi import AsgiFastStream, make_ping_asgi
from prometheus_client import REGISTRY, make_asgi_app
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from faststream_outbox import OutboxBroker, make_outbox_table
from faststream_outbox.metrics.prometheus import PrometheusRecorder
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(
engine,
outbox_table=outbox_table,
metrics_recorder=PrometheusRecorder(app_name="checkout", registry=REGISTRY),
)
@broker.subscriber("orders", max_workers=4)
async def handle_order(body: dict) -> None: ...
app = AsgiFastStream(
broker,
asgi_routes=[
("/metrics", make_asgi_app(registry=REGISTRY)),
("/healthz", make_ping_asgi(broker, timeout=2.0)),
],
)
AsgiFastStream accepts any ASGI sub-app under asgi_routes; mount
make_asgi_app(REGISTRY) to expose Prometheus exposition without
pulling FastAPI in. make_ping_asgi(broker) is FastStream's built-in
liveness probe — handy for Kubernetes.
The broker label is always "outbox"; existing FastStream Grafana
dashboards keep working — add broker="outbox" to the PromQL filter.
Consume vs publish label set¶
The adapter uses a different label set for consume vs publish, matching upstream verbatim:
- Consume tags by
handler(the subscriber) - Publish tags by
destination(the queue)
See Observability § PromQL playbook for the operator query catalog.
OpenTelemetry adapter¶
Drop-in compatible with FastStream's TelemetryMiddleware, meter
only — no spans (use the native middleware
section below if you need spans).
pip install 'faststream-outbox[opentelemetry,prometheus]' \
opentelemetry-exporter-prometheus uvicorn
# app.py — run with `uvicorn app:app --host 0.0.0.0 --port 8000`
from faststream.asgi import AsgiFastStream
from opentelemetry import metrics
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from prometheus_client import REGISTRY, make_asgi_app
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from faststream_outbox import OutboxBroker, make_outbox_table
from faststream_outbox.metrics.opentelemetry import OpenTelemetryRecorder
# OTel meters → Prometheus reader (scraped at /metrics below)
prometheus_reader = PrometheusMetricReader()
meter_provider = MeterProvider(metric_readers=[prometheus_reader])
metrics.set_meter_provider(meter_provider)
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(
engine,
outbox_table=outbox_table,
metrics_recorder=OpenTelemetryRecorder(meter_provider=meter_provider),
)
@broker.subscriber("orders", max_workers=4)
async def handle_order(body: dict) -> None: ...
app = AsgiFastStream(broker, asgi_routes=[("/metrics", make_asgi_app(registry=REGISTRY))])
The PrometheusMetricReader converts OTel meter data points to
Prometheus exposition format on /metrics; for OTLP push instead,
swap the reader for PeriodicExportingMetricReader(OTLPMetricExporter(...))
and drop the /metrics route.
Instrument names match faststream.opentelemetry.TelemetryMiddleware for
the bus-scope metrics — messaging.process.duration,
messaging.publish.duration, and (when include_messages_counters=True)
messaging.process.messages / messaging.publish.messages — plus three
outbox-specific counters the middleware can't emit:
messaging.outbox.fetch.batches, messaging.outbox.lease_lost, and
messaging.outbox.dlq_written. Units and constructor args
(meter_provider, meter, include_messages_counters) follow upstream.
The messaging.system="outbox" attribute disambiguates outbox traffic
from Kafka / Rabbit data on the same instruments.
Tracing (spans) is not modelled by this adapter — the callable seam can't bracket a span lifecycle. For spans, use the native middleware integration below.
Native middleware (spans + bus parity)¶
For OTel spans wrapping consume_scope / publish_scope and the
exact upstream label / instrument schema, register the native
middleware subclasses via middlewares=[...] — same
registration pattern as KafkaPrometheusMiddleware /
RabbitTelemetryMiddleware.
Both seams together¶
The recommended setup pairs middleware with the recorder so every event the bus emits and every outbox-internal event lands in one observability stack:
pip install 'faststream-outbox[opentelemetry,prometheus]' \
opentelemetry-exporter-otlp opentelemetry-exporter-prometheus uvicorn
# app.py — run with `OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 \
# uvicorn app:app --host 0.0.0.0 --port 8000`
from faststream.asgi import AsgiFastStream
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from prometheus_client import CollectorRegistry, make_asgi_app
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import create_async_engine
from faststream_outbox import OutboxBroker, make_outbox_table
from faststream_outbox.metrics.prometheus import PrometheusRecorder
from faststream_outbox.opentelemetry import OutboxTelemetryMiddleware
from faststream_outbox.prometheus import OutboxPrometheusMiddleware
# ----- OTel SDK -----
resource = Resource.create({"service.name": "my-outbox-service"})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(tracer_provider)
meter_provider = MeterProvider(resource=resource, metric_readers=[PrometheusMetricReader()])
metrics.set_meter_provider(meter_provider)
# Two registries: the middleware and the recorder both define the same
# faststream_* consume/publish collectors, so sharing one registry raises
# "Duplicated timeseries in CollectorRegistry" at broker construction.
MIDDLEWARE_REGISTRY = CollectorRegistry()
RECORDER_REGISTRY = CollectorRegistry()
# ----- Outbox broker -----
metadata = MetaData()
outbox_table = make_outbox_table(metadata, table_name="outbox")
engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox")
broker = OutboxBroker(
engine,
outbox_table=outbox_table,
middlewares=[
# Bus-scope spans + meters around consume_scope / publish_scope.
OutboxTelemetryMiddleware(tracer_provider=tracer_provider, meter_provider=meter_provider),
OutboxPrometheusMiddleware(registry=MIDDLEWARE_REGISTRY, app_name="my-outbox-service"),
],
# Outbox-internal events (fetched, lease_lost, terminal reasons, dlq_written)
# that have no message context and can't reach the middleware bus.
metrics_recorder=PrometheusRecorder(registry=RECORDER_REGISTRY, app_name="my-outbox-service"),
)
@broker.subscriber("orders", max_workers=4)
async def handle_order(body: dict) -> None: ...
app = AsgiFastStream(
broker,
asgi_routes=[
("/metrics", make_asgi_app(registry=MIDDLEWARE_REGISTRY)),
("/metrics/outbox", make_asgi_app(registry=RECORDER_REGISTRY)),
],
)
Traces flow to OTLP (Jaeger / Tempo / Honeycomb / collector); the
middleware's meters land on /metrics and the recorder's outbox-internal
counters on /metrics/outbox for Prometheus to scrape — two scrape targets,
one process.
The two seams overlap on consume/publish series. Both the middleware
and the recorder emit the same faststream_received_* / faststream_published_*
collectors, which is why they must live on separate registries (above) —
sharing one raises Duplicated timeseries in CollectorRegistry at broker
construction, and summing across both double-counts every consume and
publish. Treat the middleware as the source of truth for consume/publish;
the recorder's unique value is the outbox-internal events the middleware
can't see (fetched, lease_lost, terminal reasons, dlq_written).
The providers set messaging.system = "outbox", matching the
recorder-seam adapters. The OTel provider maps row.id →
messaging.message.id, row.queue → messaging.destination_publish.name,
correlation_id → messaging.message.conversation_id, len(payload) →
messaging.message.payload_size_bytes, and len(cmd.batch_bodies) →
messaging.batch.message_count when >1.