On this page
API reference for fastware auth: JWT token creation and verification, bcrypt password hashing, user stores, CSRF middleware, and rate limiting.
#Auth API Reference
Warning
The auth module requires the fastware[auth] extra, which installs PyJWT and bcrypt. Install with:
uv add "fastware[auth]"Importing fastware.auth without these dependencies will raise ImportError at call time.
The auth module provides JWT token operations, password hashing, user storage, CSRF protection, rate limiting, and session cookie management. All functions are pure and DI-compatible, with no framework-specific dependencies beyond fastware's own ASGI types.
#src.fastware.auth
Authentication module providing JWT token creation and verification, bcrypt password hashing, user storage, CSRF protection, and rate limiting.
Pure functions and DI-compatible factories with no framework-specific dependencies beyond fastware's own asgi types. Keeps auth logic testable and reusable.
#create_token
def create_token(username: str, role: str, secret: str, expires_hours: int=720) -> strCreate a signed JWT with sub, role, exp, and iat claims (HS256).
#verify_token
def verify_token(token: str, secret: str) -> dict[str, Any] | NoneDecode and validate a JWT. Returns claims dict or None on any error.
#hash_password
def hash_password(plain: str) -> strHash a plaintext password with bcrypt.
#verify_password
def verify_password(plain: str, hashed: str) -> boolCheck a plaintext password against a bcrypt hash.
#UserStore
Abstract user storage interface.
#load_users
def load_users(self) -> list[dict[str, str]]#save_users
def save_users(self, users: list[dict[str, str]]) -> None#find_user
def find_user(self, username: str) -> dict[str, str] | None#create_user
def create_user(self, username: str, password: str, role: str) -> dict[str, str]Create a new user. Raises ValueError if username already exists.
#delete_user
def delete_user(self, username: str) -> NoneDelete a user by username. Raises LookupError if not found.
#JSONFileUserStore
User storage backed by a JSON file.
#load_users
def load_users(self) -> list[dict[str, str]]#save_users
def save_users(self, users: list[dict[str, str]]) -> None#get_current_user
def get_current_user(request: Any) -> dict[str, Any]Extract and validate JWT from Authorization header, session cookie, or query param.
Token resolution order:
- Authorization: Bearer
header - session cookie
- ?token= query parameter
Reads the JWT secret from request.state["config"]["jwt_secret"]. Returns decoded claims dict. Raises HTTPError(401) on failure.
#require_role
def require_role(role: str) -> CallableReturn a DI factory that checks the current user has the given role.
Usage: @router.get("/admin", deps={"user": require_role("admin")})
#_get_asgi_header
def _get_asgi_header(headers: list[tuple[bytes, bytes]], name: bytes) -> bytesReturn the first header value matching name (lowercase), or b"".
#_get_asgi_cookie
def _get_asgi_cookie(headers: list[tuple[bytes, bytes]], cookie_name: str) -> strParse the Cookie header and return a single cookie value, or "".
#CSRFMiddleware
Double-submit cookie CSRF protection (pure ASGI).
For state-changing requests (POST, PUT, PATCH, DELETE) that aren't exempt, validates that: 1. A csrf_token cookie is present. 2. An X-CSRF-Token header is present. 3. The two values match.
Constructor args: app: inner ASGI application exempt_paths: list of path prefixes to skip CSRF checks disabled: bypass all checks (for testing)
#set_session_cookies
def set_session_cookies(token: str, csrf_token: str) -> list[str]Build Set-Cookie header strings for session and CSRF cookies.
Returns a list of two Set-Cookie strings:
- session: httponly, samesite=lax (not readable by JS)
- csrf_token: js-readable (no httponly), samesite=lax
#clear_session_cookies
def clear_session_cookies() -> list[str]Build Set-Cookie header strings that clear session and CSRF cookies.
#rate_limit
def rate_limit(rate: str, key_func: Callable | None=None) -> CallableDecorator for per-client rate limiting using a token bucket.
Usage: @router.get("/api/search") @rate_limit("5/minute") async def search(request): ...
Args:
rate: Rate string like "5/minute", "10/second", "100/hour".key_func: Optional callable(request) -> str for custom bucket keys.
Defaults to client IP from ASGI scope.
#Practical Example: JWT Auth on a Route
Setting up JWT authentication on a protected route using the dependency injection system. This example shows how to hash and verify passwords with bcrypt, issue JWT tokens at login, read them back with get_current_user, and enforce role-based access control on specific endpoints:
from fastware import Router, create_app
from fastware.auth import (
create_token,
get_current_user,
require_role,
hash_password,
verify_password,
CSRFMiddleware,
set_session_cookies,
clear_session_cookies,
)
router = Router()
@router.post("/login")
async def login(request):
"""Authenticate a user and return a JWT token with session cookies."""
data = request.json
username = data["username"]
password = data["password"]
# Look up the user (your storage layer here)
user = user_store.find_user(username)
if not user or not verify_password(password, user["password_hash"]):
raise HTTPError(401, "Invalid credentials")
# Create a JWT token
jwt_secret = request.state["config"]["jwt_secret"]
token = create_token(username, user["role"], jwt_secret)
csrf_token = secrets.token_urlsafe(32)
return JSONResponse(
{"token": token, "username": username},
cookies=set_session_cookies(token, csrf_token),
)
@router.get("/profile", deps={"user": get_current_user})
async def profile(request, user):
"""Return the authenticated user's profile."""
return {"username": user["sub"], "role": user["role"]}
@router.get("/admin", deps={"user": require_role("admin")})
async def admin_dashboard(request, user):
"""Admin-only endpoint using role-based access control."""
return {"message": f"Welcome, admin {user['sub']}"}
# Apply CSRF protection as middleware
app = create_app(
router,
middleware=[
lambda app: CSRFMiddleware(app, exempt_paths=["/login"]),
],
)The get_current_user dependency reads the JWT from the Authorization header, session cookie, or ?token= query parameter (in that order). It validates the token against the secret stored in request.state["config"]["jwt_secret"] and returns the decoded claims dict. The require_role factory wraps get_current_user with an additional role check.