On this page
Guide to fastware SSE broadcasting: typed events, per-client async queues, heartbeat keep-alive, strict mode, and automatic disconnect pruning.
#SSE Broadcasting
Server-Sent Events (SSE) provide a simple, HTTP-based mechanism for pushing data from server to client. Unlike WebSockets, SSE is unidirectional (server to client only), uses plain HTTP, works through proxies and firewalls, and reconnects automatically.
#When to use SSE vs WebSocket
| Criterion | SSE | WebSocket |
|---|---|---|
| Direction | Server to client only | Bidirectional |
| Protocol | HTTP (text/event-stream) | Upgraded connection (ws://) |
| Reconnection | Built into the browser (EventSource auto-reconnects) | Manual reconnection logic required |
| Proxy/firewall | Works through standard HTTP infrastructure | May be blocked by some proxies |
| Use case | Live dashboards, notifications, progress updates, log tailing | Chat, collaborative editing, gaming |
Use SSE when you only need to push data to the client. Use WebSockets when the client needs to send data back over the same connection.
#Basic setup
#1. Create a Broadcaster
from fastware import Broadcaster, sse_route
broadcaster = Broadcaster()The Broadcaster manages a list of connected clients. Each client gets its own async queue. When you broadcast an event, it is pushed to every client's queue.
#2. Register event types
broadcaster.register_event("update")
broadcaster.register_event("error")
broadcaster.register_event("heartbeat")By default, the Broadcaster runs in strict mode -- broadcasting an unregistered event name raises ValueError. This prevents typos and ensures the event vocabulary is explicit.
#3. Wire to a route
from fastware import Router, create_app
router = Router()
router.add_route("GET", "/events", sse_route(broadcaster))The sse_route helper returns an async handler that calls broadcaster.stream(request), which creates a per-client queue and returns a StreamResponse with content-type: text/event-stream.
#4. Broadcast from handlers
@router.post("/items")
async def create_item(request):
item = request.json
# ... save to database ...
broadcaster.broadcast("update", {"action": "created", "item": item})
return {"ok": True}broadcast() is synchronous -- it pushes the formatted SSE message to every client queue without awaiting. Clients whose queues are full (they fell behind) are pruned automatically.
#5. Create the app
app = create_app(router)#Client-side JavaScript
const source = new EventSource("/events");
source.addEventListener("update", (event) => {
const data = JSON.parse(event.data);
console.log("Update received:", data);
});
source.addEventListener("error", (event) => {
const data = JSON.parse(event.data);
console.error("Server error:", data);
});
// Connection status
source.onopen = () => console.log("SSE connected");
source.onerror = () => console.log("SSE reconnecting...");The browser's EventSource automatically reconnects if the connection drops. Events are dispatched by their event: field, which maps to the first argument of broadcaster.broadcast().
#Heartbeat configuration
Long-lived SSE connections can be silently dropped by proxies, load balancers, or firewalls that enforce idle timeouts. Heartbeats prevent this by sending periodic SSE comments (: heartbeat\n\n) that keep the connection alive without triggering client-side event handlers.
broadcaster = Broadcaster(heartbeat_interval=30) # secondsWhen heartbeat_interval is set, the event generator sends a comment line if no real event arrives within the interval. SSE comments (lines starting with :) are ignored by EventSource -- they keep the TCP connection alive without producing a JavaScript event.
If heartbeat_interval is None (the default), no heartbeats are sent and the generator blocks indefinitely waiting for real events.
#Strict mode vs permissive mode
The Broadcaster supports 2 event validation modes that control whether event type names must be pre-registered before broadcasting. Strict mode (the default) catches typos at development time; permissive mode allows dynamic event vocabularies:
Strict mode (default, strict=True):
broadcaster = Broadcaster(strict=True)
broadcaster.register_event("update")
broadcaster.broadcast("update", {"ok": True}) # works
broadcaster.broadcast("typo", {"ok": True}) # raises ValueErrorStrict mode catches typos and enforces a declared event vocabulary. Register all event types before broadcasting.
Permissive mode (strict=False):
broadcaster = Broadcaster(strict=False)
broadcaster.broadcast("anything", {"ok": True}) # works without registrationPermissive mode skips event type validation. Use this when event types are dynamic or user-defined.
#Buffer size
Each client gets an async queue with a configurable maximum size. The default buffer holds 256 messages per client. Slow consumers whose queues fill up are automatically pruned on the next broadcast() call to prevent unbounded memory growth:
broadcaster = Broadcaster(buffer_size=512)When a client's queue is full (the client is not consuming messages fast enough), the client is pruned from the client list on the next broadcast() call. This prevents a slow consumer from causing memory growth.
#Introspection
broadcaster.client_count # number of connected clients
broadcaster.event_types # frozenset of registered event types#Complete example
from fastware import Router, Broadcaster, sse_route, create_app, serve
broadcaster = Broadcaster(heartbeat_interval=30)
broadcaster.register_event("message")
broadcaster.register_event("status")
router = Router()
router.add_route("GET", "/events", sse_route(broadcaster))
@router.post("/send")
async def send_message(request):
data = request.json
broadcaster.broadcast("message", {"text": data["text"]})
return {"sent": True}
@router.get("/status")
async def get_status(request):
broadcaster.broadcast("status", {"clients": broadcaster.client_count})
return {"clients": broadcaster.client_count}
app = create_app(router)
if __name__ == "__main__":
serve(app, foreground=True, host="127.0.0.1", port=8000)#API reference
See the full Broadcaster class reference below, which documents the register_event method for declaring event types, the broadcast method for pushing events to all connected clients, the stream method for creating per-client SSE response generators, and the sse_route helper function for wiring a Broadcaster to a route:
#src.fastware.sse
SSE (Server-Sent Events) broadcaster with typed event registration, per-client async queues, automatic disconnect pruning, and strict mode enforcement.
#Broadcaster
Manages SSE client connections and broadcasts typed events.
Event types must be registered via register_event before they can be broadcast. In strict mode (the default), broadcasting an unregistered event raises ValueError. Pass strict=False to skip validation.
#register_event
def register_event(self, name: str) -> NoneDeclare an allowed event type.
#event_types
def event_types(self) -> frozenset[str]Currently registered event types.
#_format_sse
def _format_sse(self, event: str, data: dict[str, Any] | str) -> strFormat a payload as an SSE wire message.
#broadcast
def broadcast(self, event: str, data: dict[str, Any] | str) -> NoneSend an event to all connected clients.
Prunes clients whose queues are full (they fell behind and are presumed disconnected or stuck).
Raises ValueError if event was not previously registered and the broadcaster is in strict mode.
#_event_generator
async def _event_generator(self, queue: asyncio.Queue[str]) -> AsyncGenerator[str, None]Yield SSE messages from a per-client queue.
When heartbeat_interval is set, yields SSE comment heartbeats (": heartbeat\n\n") if no real message arrives within the interval.
#stream
async def stream(self, request: Request) -> StreamResponseReturn a StreamResponse for an SSE endpoint.
Creates a per-client queue, registers it, and wraps the async generator in the framework's streaming response type.
#client_count
def client_count(self) -> intNumber of currently connected SSE clients.
#sse_route
def sse_route(broadcaster: Broadcaster)Return an async handler suitable for router.add_route("GET", "/events", handler).