On this page
Authentication module providing JWT token creation and verification, bcrypt password hashing, user storage, CSRF protection, and rate limiting.
#src.fastware.auth
#src.fastware.auth
Authentication module providing JWT token creation and verification, bcrypt password hashing, user storage, CSRF protection, and rate limiting.
Pure functions and DI-compatible factories with no framework-specific dependencies beyond fastware's own asgi types. Keeps auth logic testable and reusable.
#create_token
def create_token(username: str, role: str, secret: str, expires_hours: int=720) -> strCreate a signed JWT with sub, role, exp, and iat claims (HS256).
#verify_token
def verify_token(token: str, secret: str) -> dict[str, Any] | NoneDecode and validate a JWT. Returns claims dict or None on any error.
#hash_password
def hash_password(plain: str) -> strHash a plaintext password with bcrypt.
#verify_password
def verify_password(plain: str, hashed: str) -> boolCheck a plaintext password against a bcrypt hash.
#UserStore
Abstract user storage interface.
#load_users
def load_users(self) -> list[dict[str, str]]#save_users
def save_users(self, users: list[dict[str, str]]) -> None#find_user
def find_user(self, username: str) -> dict[str, str] | None#create_user
def create_user(self, username: str, password: str, role: str) -> dict[str, str]Create a new user. Raises ValueError if username already exists.
#delete_user
def delete_user(self, username: str) -> NoneDelete a user by username. Raises LookupError if not found.
#JSONFileUserStore
User storage backed by a JSON file.
#load_users
def load_users(self) -> list[dict[str, str]]#save_users
def save_users(self, users: list[dict[str, str]]) -> None#get_current_user
def get_current_user(request: Any) -> dict[str, Any]Extract and validate JWT from Authorization header, session cookie, or query param.
Token resolution order:
- Authorization: Bearer
header - session cookie
- ?token= query parameter
Reads the JWT secret from request.state["config"]["jwt_secret"]. Returns decoded claims dict. Raises HTTPError(401) on failure.
#require_role
def require_role(role: str) -> CallableReturn a DI factory that checks the current user has the given role.
Usage: @router.get("/admin", deps={"user": require_role("admin")})
#_get_asgi_header
def _get_asgi_header(headers: list[tuple[bytes, bytes]], name: bytes) -> bytesReturn the first header value matching name (lowercase), or b"".
#_get_asgi_cookie
def _get_asgi_cookie(headers: list[tuple[bytes, bytes]], cookie_name: str) -> strParse the Cookie header and return a single cookie value, or "".
#CSRFMiddleware
Double-submit cookie CSRF protection (pure ASGI).
For state-changing requests (POST, PUT, PATCH, DELETE) that aren't exempt, validates that: 1. A csrf_token cookie is present. 2. An X-CSRF-Token header is present. 3. The two values match.
Constructor args: app: inner ASGI application exempt_paths: list of path prefixes to skip CSRF checks disabled: bypass all checks (for testing)
#set_session_cookies
def set_session_cookies(token: str, csrf_token: str) -> list[str]Build Set-Cookie header strings for session and CSRF cookies.
Returns a list of two Set-Cookie strings:
- session: httponly, samesite=lax (not readable by JS)
- csrf_token: js-readable (no httponly), samesite=lax
#clear_session_cookies
def clear_session_cookies() -> list[str]Build Set-Cookie header strings that clear session and CSRF cookies.
#rate_limit
def rate_limit(rate: str, key_func: Callable | None=None) -> CallableDecorator for per-client rate limiting using a token bucket.
Usage: @router.get("/api/search") @rate_limit("5/minute") async def search(request): ...
Args:
rate: Rate string like "5/minute", "10/second", "100/hour".key_func: Optional callable(request) -> str for custom bucket keys.
Defaults to client IP from ASGI scope.