fastware v0.1.0 /Auth API Reference
On this page

API reference for fastware auth: JWT token creation and verification, bcrypt password hashing, user stores, CSRF middleware, and rate limiting.

#Auth API Reference

Warning

The auth module requires the fastware[auth] extra, which installs PyJWT and bcrypt. Install with:

$_ bash
uv add "fastware[auth]"

Importing fastware.auth without these dependencies will raise ImportError at call time.

The auth module provides JWT token operations, password hashing, user storage, CSRF protection, rate limiting, and session cookie management. All functions are pure and DI-compatible, with no framework-specific dependencies beyond fastware's own ASGI types.

#src.fastware.auth

Authentication module providing JWT token creation and verification, bcrypt password hashing, user storage, CSRF protection, and rate limiting.

Pure functions and DI-compatible factories with no framework-specific dependencies beyond fastware's own asgi types. Keeps auth logic testable and reusable.

#create_token

python
def create_token(username: str, role: str, secret: str, expires_hours: int=720) -> str

Create a signed JWT with sub, role, exp, and iat claims (HS256).

#verify_token

python
def verify_token(token: str, secret: str) -> dict[str, Any] | None

Decode and validate a JWT. Returns claims dict or None on any error.

#hash_password

python
def hash_password(plain: str) -> str

Hash a plaintext password with bcrypt.

#verify_password

python
def verify_password(plain: str, hashed: str) -> bool

Check a plaintext password against a bcrypt hash.

#UserStore

Abstract user storage interface.

#load_users

python
def load_users(self) -> list[dict[str, str]]

#save_users

python
def save_users(self, users: list[dict[str, str]]) -> None

#find_user

python
def find_user(self, username: str) -> dict[str, str] | None

#create_user

python
def create_user(self, username: str, password: str, role: str) -> dict[str, str]

Create a new user. Raises ValueError if username already exists.

#delete_user

python
def delete_user(self, username: str) -> None

Delete a user by username. Raises LookupError if not found.

#JSONFileUserStore

User storage backed by a JSON file.

#load_users

python
def load_users(self) -> list[dict[str, str]]

#save_users

python
def save_users(self, users: list[dict[str, str]]) -> None

#get_current_user

python
def get_current_user(request: Any) -> dict[str, Any]

Extract and validate JWT from Authorization header, session cookie, or query param.

Token resolution order:

  1. Authorization: Bearer header
  2. session cookie
  3. ?token= query parameter

Reads the JWT secret from request.state["config"]["jwt_secret"]. Returns decoded claims dict. Raises HTTPError(401) on failure.

#require_role

python
def require_role(role: str) -> Callable

Return a DI factory that checks the current user has the given role.

Usage: @router.get("/admin", deps={"user": require_role("admin")})

#_get_asgi_header

python
def _get_asgi_header(headers: list[tuple[bytes, bytes]], name: bytes) -> bytes

Return the first header value matching name (lowercase), or b"".

python
def _get_asgi_cookie(headers: list[tuple[bytes, bytes]], cookie_name: str) -> str

Parse the Cookie header and return a single cookie value, or "".

#CSRFMiddleware

Double-submit cookie CSRF protection (pure ASGI).

For state-changing requests (POST, PUT, PATCH, DELETE) that aren't exempt, validates that: 1. A csrf_token cookie is present. 2. An X-CSRF-Token header is present. 3. The two values match.

Constructor args: app: inner ASGI application exempt_paths: list of path prefixes to skip CSRF checks disabled: bypass all checks (for testing)

#set_session_cookies

python
def set_session_cookies(token: str, csrf_token: str) -> list[str]

Build Set-Cookie header strings for session and CSRF cookies.

Returns a list of two Set-Cookie strings:

  • session: httponly, samesite=lax (not readable by JS)
  • csrf_token: js-readable (no httponly), samesite=lax

#clear_session_cookies

python
def clear_session_cookies() -> list[str]

Build Set-Cookie header strings that clear session and CSRF cookies.

#rate_limit

python
def rate_limit(rate: str, key_func: Callable | None=None) -> Callable

Decorator for per-client rate limiting using a token bucket.

Usage: @router.get("/api/search") @rate_limit("5/minute") async def search(request): ...

Args:

  • rate: Rate string like "5/minute", "10/second", "100/hour".
  • key_func: Optional callable(request) -> str for custom bucket keys.

Defaults to client IP from ASGI scope.

#Practical Example: JWT Auth on a Route

Setting up JWT authentication on a protected route using the dependency injection system. This example shows how to hash and verify passwords with bcrypt, issue JWT tokens at login, read them back with get_current_user, and enforce role-based access control on specific endpoints:

python
from fastware import Router, create_app
from fastware.auth import (
    create_token,
    get_current_user,
    require_role,
    hash_password,
    verify_password,
    CSRFMiddleware,
    set_session_cookies,
    clear_session_cookies,
)

router = Router()


@router.post("/login")
async def login(request):
    """Authenticate a user and return a JWT token with session cookies."""
    data = request.json
    username = data["username"]
    password = data["password"]

    # Look up the user (your storage layer here)
    user = user_store.find_user(username)
    if not user or not verify_password(password, user["password_hash"]):
        raise HTTPError(401, "Invalid credentials")

    # Create a JWT token
    jwt_secret = request.state["config"]["jwt_secret"]
    token = create_token(username, user["role"], jwt_secret)
    csrf_token = secrets.token_urlsafe(32)

    return JSONResponse(
        {"token": token, "username": username},
        cookies=set_session_cookies(token, csrf_token),
    )


@router.get("/profile", deps={"user": get_current_user})
async def profile(request, user):
    """Return the authenticated user's profile."""
    return {"username": user["sub"], "role": user["role"]}


@router.get("/admin", deps={"user": require_role("admin")})
async def admin_dashboard(request, user):
    """Admin-only endpoint using role-based access control."""
    return {"message": f"Welcome, admin {user['sub']}"}


# Apply CSRF protection as middleware
app = create_app(
    router,
    middleware=[
        lambda app: CSRFMiddleware(app, exempt_paths=["/login"]),
    ],
)

The get_current_user dependency reads the JWT from the Authorization header, session cookie, or ?token= query parameter (in that order). It validates the token against the secret stored in request.state["config"]["jwt_secret"] and returns the decoded claims dict. The require_role factory wraps get_current_user with an additional role check.