"""FastAPI app for Bookly. Hosts /api/chat, /health, and the static chat UI. Security posture notes: - Sessions are server-issued and HMAC-signed. The client never chooses its own session ID, so a leaked or guessed body cannot hijack someone else's chat history. See `_resolve_session`. - Every response carries a strict Content-Security-Policy and related headers (see `security_headers`). The chat UI already uses `textContent` for model replies, so XSS is structurally impossible; CSP is defense in depth for future edits. - In-memory sliding-window rate limiting is applied per IP and per session. Suitable for a single-process demo deployment; swap to a shared store for multi-worker. """ from __future__ import annotations import hashlib import hmac import logging import secrets import threading import time from collections import defaultdict, deque from pathlib import Path import anthropic from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field import agent from config import settings logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") logger = logging.getLogger("bookly.server") app = FastAPI(title="Bookly", docs_url=None, redoc_url=None) # --------------------------------------------------------------------------- # Security headers # --------------------------------------------------------------------------- _SECURITY_HEADERS: dict[str, str] = { # Tight CSP: only same-origin assets, no inline scripts, no embedding. # The UI is plain HTML+JS under /static, all same-origin. "Content-Security-Policy": ( "default-src 'self'; " "script-src 'self'; " "style-src 'self'; " "img-src 'self' data:; " "connect-src 'self'; " "object-src 'none'; " "base-uri 'none'; " "frame-ancestors 'none'; " "form-action 'self'" ), "X-Content-Type-Options": "nosniff", "X-Frame-Options": "DENY", "Referrer-Policy": "no-referrer", "Permissions-Policy": "geolocation=(), microphone=(), camera=()", } @app.middleware("http") async def security_headers(request: Request, call_next): response = await call_next(request) for header_name, header_value in _SECURITY_HEADERS.items(): response.headers.setdefault(header_name, header_value) return response # --------------------------------------------------------------------------- # Sliding-window rate limiter (in-memory) # --------------------------------------------------------------------------- class SlidingWindowRateLimiter: """Per-key request counter over a fixed trailing window. Not meant to be bulletproof -- this is a small demo deployment, not an edge-network WAF. Enforces a ceiling per IP and per session so a single caller cannot burn the Anthropic budget or exhaust memory by spamming `/api/chat`. """ def __init__(self, *, window_seconds: int = 60) -> None: if window_seconds <= 0: raise ValueError("window_seconds must be positive") self._window = window_seconds self._hits: defaultdict[str, deque[float]] = defaultdict(deque) self._lock = threading.Lock() def check(self, key: str, max_hits: int) -> bool: """Record a hit on `key`. Returns True if under the limit, False otherwise.""" if max_hits <= 0: return False now = time.monotonic() cutoff = now - self._window with self._lock: bucket = self._hits[key] while bucket and bucket[0] < cutoff: bucket.popleft() if len(bucket) >= max_hits: return False bucket.append(now) return True _rate_limiter = SlidingWindowRateLimiter(window_seconds=60) def _client_ip(request: Request) -> str: """Best-effort client IP for rate limiting. If the app is deployed behind a reverse proxy, set the proxy to add `X-Forwarded-For` and trust the first hop. Otherwise fall back to the direct client address. """ forwarded = request.headers.get("x-forwarded-for", "") if forwarded: first = forwarded.split(",", 1)[0].strip() if first: return first if request.client is not None: return request.client.host return "unknown" # --------------------------------------------------------------------------- # Session cookies (server-issued, HMAC-signed) # --------------------------------------------------------------------------- _SESSION_COOKIE_SEPARATOR = "." def _sign_session_id(session_id: str) -> str: secret = settings.session_secret.get_secret_value().encode("utf-8") signature = hmac.new(secret, session_id.encode("utf-8"), hashlib.sha256).hexdigest() return f"{session_id}{_SESSION_COOKIE_SEPARATOR}{signature}" def _verify_signed_session(signed_value: str) -> str | None: if not signed_value or _SESSION_COOKIE_SEPARATOR not in signed_value: return None session_id, _, signature = signed_value.partition(_SESSION_COOKIE_SEPARATOR) if not session_id or not signature: return None expected = _sign_session_id(session_id) # Compare the full signed form in constant time to avoid timing leaks on # the signature bytes. if not hmac.compare_digest(expected, signed_value): return None return session_id def _issue_new_session_id() -> str: return secrets.token_urlsafe(24) def _resolve_session(request: Request, response: Response) -> str: """Return the session_id for this request, issuing a fresh cookie if needed. The client never chooses the session_id. Anything in the request body that claims to be one is ignored. If the cookie is missing or tampered with, we mint a new session_id and set the cookie on the response. """ signed_cookie = request.cookies.get(settings.session_cookie_name, "") session_id = _verify_signed_session(signed_cookie) if session_id is not None: return session_id session_id = _issue_new_session_id() response.set_cookie( key=settings.session_cookie_name, value=_sign_session_id(session_id), max_age=settings.session_cookie_max_age_seconds, httponly=True, secure=settings.session_cookie_secure, samesite="lax", path="/", ) return session_id # --------------------------------------------------------------------------- # Request/response models # --------------------------------------------------------------------------- class ChatRequest(BaseModel): # `session_id` is intentionally NOT accepted from clients. Sessions are # tracked server-side via the signed cookie. message: str = Field(..., min_length=1, max_length=4000) class ChatResponse(BaseModel): reply: str # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/health") def health() -> dict: return {"status": "ok"} @app.get("/") def root() -> RedirectResponse: return RedirectResponse(url="/static/index.html") @app.post("/api/chat", response_model=ChatResponse) def chat(body: ChatRequest, http_request: Request, http_response: Response) -> ChatResponse: session_id = _resolve_session(http_request, http_response) ip = _client_ip(http_request) if not _rate_limiter.check(f"ip:{ip}", settings.rate_limit_per_ip_per_minute): logger.info("rate_limited scope=ip") raise HTTPException(status_code=429, detail="Too many requests. Please slow down.") if not _rate_limiter.check(f"session:{session_id}", settings.rate_limit_per_session_per_minute): logger.info("rate_limited scope=session") raise HTTPException(status_code=429, detail="Too many requests. Please slow down.") try: reply = agent.run_turn(session_id, body.message) except anthropic.RateLimitError: logger.warning("anthropic_rate_limited") raise HTTPException( status_code=503, detail="Our AI provider is busy right now. Please try again in a moment.", ) except anthropic.APIConnectionError: logger.warning("anthropic_connection_error") raise HTTPException( status_code=503, detail="We couldn't reach our AI provider. Please try again in a moment.", ) except anthropic.APIStatusError as exc: logger.error("anthropic_api_error status=%s", exc.status_code) raise HTTPException( status_code=502, detail="Our AI provider returned an error. Please try again.", ) except ValueError: # Programmer-visible input errors (e.g., blank message). Surface a # 400 rather than a 500 so clients can distinguish. raise HTTPException(status_code=400, detail="Invalid request.") except Exception: logger.exception("chat_failed") raise HTTPException(status_code=500, detail="Something went wrong handling that message.") return ChatResponse(reply=reply) # Absolute path so the mount works regardless of the process working directory. _STATIC_DIR = Path(__file__).resolve().parent / "static" _ARCHITECTURE_HTML_PATH = _STATIC_DIR / "architecture.html" # Pandoc-generated literate program. The HTML comes from weaving Bookly.lit.md # and contains inline styles (and inline SVG from mermaid-filter), so the # default strict CSP must be relaxed for this one route. _ARCHITECTURE_CSP = ( "default-src 'self'; " "style-src 'self' 'unsafe-inline'; " "script-src 'none'; " "img-src 'self' data:; " "object-src 'none'; " "base-uri 'none'; " "frame-ancestors 'none'" ) @app.get("/architecture", response_class=HTMLResponse) def architecture() -> HTMLResponse: """Serve the woven literate program for the Bookly codebase.""" try: html = _ARCHITECTURE_HTML_PATH.read_text(encoding="utf-8") except FileNotFoundError: raise HTTPException( status_code=404, detail="Architecture document has not been built yet.", ) response = HTMLResponse(content=html) response.headers["Content-Security-Policy"] = _ARCHITECTURE_CSP return response app.mount("/static", StaticFiles(directory=str(_STATIC_DIR)), name="static")