fastware v0.1.0 /src.fastware.auth
On this page

Authentication module providing JWT token creation and verification, bcrypt password hashing, user storage, CSRF protection, and rate limiting.

#src.fastware.auth

#src.fastware.auth

Authentication module providing JWT token creation and verification, bcrypt password hashing, user storage, CSRF protection, and rate limiting.

Pure functions and DI-compatible factories with no framework-specific dependencies beyond fastware's own asgi types. Keeps auth logic testable and reusable.

#create_token

python
def create_token(username: str, role: str, secret: str, expires_hours: int=720) -> str

Create a signed JWT with sub, role, exp, and iat claims (HS256).

#verify_token

python
def verify_token(token: str, secret: str) -> dict[str, Any] | None

Decode and validate a JWT. Returns claims dict or None on any error.

#hash_password

python
def hash_password(plain: str) -> str

Hash a plaintext password with bcrypt.

#verify_password

python
def verify_password(plain: str, hashed: str) -> bool

Check a plaintext password against a bcrypt hash.

#UserStore

Abstract user storage interface.

#load_users

python
def load_users(self) -> list[dict[str, str]]

#save_users

python
def save_users(self, users: list[dict[str, str]]) -> None

#find_user

python
def find_user(self, username: str) -> dict[str, str] | None

#create_user

python
def create_user(self, username: str, password: str, role: str) -> dict[str, str]

Create a new user. Raises ValueError if username already exists.

#delete_user

python
def delete_user(self, username: str) -> None

Delete a user by username. Raises LookupError if not found.

#JSONFileUserStore

User storage backed by a JSON file.

#load_users

python
def load_users(self) -> list[dict[str, str]]

#save_users

python
def save_users(self, users: list[dict[str, str]]) -> None

#get_current_user

python
def get_current_user(request: Any) -> dict[str, Any]

Extract and validate JWT from Authorization header, session cookie, or query param.

Token resolution order:

  1. Authorization: Bearer header
  2. session cookie
  3. ?token= query parameter

Reads the JWT secret from request.state["config"]["jwt_secret"]. Returns decoded claims dict. Raises HTTPError(401) on failure.

#require_role

python
def require_role(role: str) -> Callable

Return a DI factory that checks the current user has the given role.

Usage: @router.get("/admin", deps={"user": require_role("admin")})

#_get_asgi_header

python
def _get_asgi_header(headers: list[tuple[bytes, bytes]], name: bytes) -> bytes

Return the first header value matching name (lowercase), or b"".

python
def _get_asgi_cookie(headers: list[tuple[bytes, bytes]], cookie_name: str) -> str

Parse the Cookie header and return a single cookie value, or "".

#CSRFMiddleware

Double-submit cookie CSRF protection (pure ASGI).

For state-changing requests (POST, PUT, PATCH, DELETE) that aren't exempt, validates that: 1. A csrf_token cookie is present. 2. An X-CSRF-Token header is present. 3. The two values match.

Constructor args: app: inner ASGI application exempt_paths: list of path prefixes to skip CSRF checks disabled: bypass all checks (for testing)

#set_session_cookies

python
def set_session_cookies(token: str, csrf_token: str) -> list[str]

Build Set-Cookie header strings for session and CSRF cookies.

Returns a list of two Set-Cookie strings:

  • session: httponly, samesite=lax (not readable by JS)
  • csrf_token: js-readable (no httponly), samesite=lax

#clear_session_cookies

python
def clear_session_cookies() -> list[str]

Build Set-Cookie header strings that clear session and CSRF cookies.

#rate_limit

python
def rate_limit(rate: str, key_func: Callable | None=None) -> Callable

Decorator for per-client rate limiting using a token bucket.

Usage: @router.get("/api/search") @rate_limit("5/minute") async def search(request): ...

Args:

  • rate: Rate string like "5/minute", "10/second", "100/hour".
  • key_func: Optional callable(request) -> str for custom bucket keys.

Defaults to client IP from ASGI scope.