bookly/tools.py
Cody Borders 3947180841 Harden security/perf, add literate program at /architecture
Security and performance fixes addressing a comprehensive review:

- Server-issued HMAC-signed session cookies; client-supplied session_id
  ignored. Prevents session hijacking via body substitution.
- Sliding-window rate limiter per IP and per session.
- SessionStore with LRU eviction, idle TTL, per-session threading locks,
  and a hard turn cap. Bounds memory and serializes concurrent turns for
  the same session so FastAPI's threadpool cannot corrupt history.
- Tool-use loop capped at settings.max_tool_use_iterations; Anthropic
  client gets an explicit timeout. No more infinite-loop credit burn.
- Every tool argument is regex-validated, length-capped, and
  control-character-stripped. asserts replaced with ValueError so -O
  cannot silently disable the checks.
- PII-safe warning logs: session IDs and reply bodies are hashed, never
  logged in clear.
- hmac.compare_digest for email comparison (constant-time).
- Strict Content-Security-Policy plus X-Content-Type-Options,
  X-Frame-Options, Referrer-Policy, Permissions-Policy via middleware.
- Explicit handlers for anthropic.RateLimitError, APIConnectionError,
  APIStatusError, ValueError; static dir resolved from __file__.
- Prompt cache breakpoints on the last tool schema and the last message
  so per-turn input cost scales linearly, not quadratically.
- TypedDict handler argument shapes; direct block.name/block.id access.
- functools.lru_cache on _get_client.
- Anchored word-boundary regexes for out-of-scope detection to kill
  false positives on phrases like "I'd recommend contacting...".

Literate program:

- Bookly.lit.md is now the single source of truth for the five core
  Python files. Tangles byte-for-byte; verified via tangle.ts --verify.
- Prose walkthrough, three mermaid diagrams, narrative per module.
- Woven to static/architecture.html with the app's palette
  (background #f5f3ee) via scripts/architecture-header.html.
- New GET /architecture route serves the HTML with a relaxed CSP that
  allows pandoc's inline styles. Available at
  bookly.codyborders.com/architecture.
- scripts/rebuild_architecture_html.sh regenerates the HTML after edits.
- code_reviews/2026-04-15-1433-code-review.md captures the review that
  drove these changes.

All 37 tests pass.
2026-04-15 15:02:40 -07:00

437 lines
16 KiB
Python

"""Tool schemas, dispatch, and Layer 3 (tool-side) guardrail enforcement.
Each tool has an Anthropic-format schema (used in the `tools` argument to
`messages.create`) and a handler. Handlers are typed with `TypedDict`s so the
contract between schema and handler is visible to the type checker; inputs
are still validated at runtime because the caller is ultimately the model.
The most important guardrail in the whole system lives here:
`handle_initiate_return` refuses unless `check_return_eligibility` has already
succeeded for the same order in the same session. This protects against the
agent skipping the protocol even if the system prompt is ignored entirely.
"""
from __future__ import annotations
import hmac
import re
import uuid
from dataclasses import dataclass, field
from datetime import date
from typing import Any, Callable, TypedDict
try:
from typing import NotRequired # Python 3.11+
except ImportError: # pragma: no cover -- Python 3.10 fallback
from typing_extensions import NotRequired # type: ignore[assignment]
from mock_data import ORDERS, POLICIES, RETURN_POLICY, RETURNS, TODAY
# ---------------------------------------------------------------------------
# Validation helpers
# ---------------------------------------------------------------------------
# Validator limits. These are deliberately tight: tool arguments come from
# model output, which in turn reflects user input, so anything that would not
# plausibly appear in a real support conversation is rejected.
ORDER_ID_RE = re.compile(r"^BK-\d{4,6}$")
EMAIL_RE = re.compile(r"^[^@\s]{1,64}@[^@\s]{1,255}\.[^@\s]{1,10}$")
TOPIC_RE = re.compile(r"^[a-z][a-z_]{0,39}$")
ITEM_TITLE_MAX_LENGTH = 200
REASON_MAX_LENGTH = 500
ITEM_TITLES_MAX_COUNT = 50
# Control characters are stripped from any free-form input. Keeping them out
# of tool payloads means they cannot end up in prompts on later turns, which
# closes one prompt-injection surface.
_CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]")
class ToolValidationError(ValueError):
"""Raised when a tool argument fails validation.
The dispatcher catches this and converts it into a tool-result error so
the model can recover on its next turn instead of crashing the request.
"""
def _require_string(value: Any, field_name: str, *, max_length: int) -> str:
if not isinstance(value, str):
raise ToolValidationError(f"{field_name} must be a string")
cleaned = _CONTROL_CHAR_RE.sub("", value).strip()
if not cleaned:
raise ToolValidationError(f"{field_name} is required")
if len(cleaned) > max_length:
raise ToolValidationError(f"{field_name} must be at most {max_length} characters")
return cleaned
def _require_order_id(value: Any) -> str:
order_id = _require_string(value, "order_id", max_length=16)
if not ORDER_ID_RE.match(order_id):
raise ToolValidationError("order_id must match the format BK-NNNN")
return order_id
def _require_email(value: Any, *, field_name: str = "customer_email") -> str:
email = _require_string(value, field_name, max_length=320)
if not EMAIL_RE.match(email):
raise ToolValidationError(f"{field_name} is not a valid email address")
return email
def _optional_email(value: Any, *, field_name: str = "customer_email") -> str | None:
if value is None:
return None
return _require_email(value, field_name=field_name)
def _require_topic(value: Any) -> str:
topic = _require_string(value, "topic", max_length=40)
topic = topic.lower()
if not TOPIC_RE.match(topic):
raise ToolValidationError("topic must be lowercase letters and underscores only")
return topic
def _optional_item_titles(value: Any) -> list[str] | None:
if value is None:
return None
if not isinstance(value, list):
raise ToolValidationError("item_titles must be a list of strings")
if len(value) > ITEM_TITLES_MAX_COUNT:
raise ToolValidationError(f"item_titles may contain at most {ITEM_TITLES_MAX_COUNT} entries")
cleaned: list[str] = []
for index, entry in enumerate(value):
cleaned.append(_require_string(entry, f"item_titles[{index}]", max_length=ITEM_TITLE_MAX_LENGTH))
return cleaned
def _emails_match(supplied: str | None, stored: str | None) -> bool:
"""Constant-time email comparison with normalization.
Returns False if either side is missing. Uses `hmac.compare_digest` to
close the timing side-channel that would otherwise leak the correct
prefix of a stored email.
"""
if supplied is None or stored is None:
return False
supplied_norm = supplied.strip().lower().encode("utf-8")
stored_norm = stored.strip().lower().encode("utf-8")
return hmac.compare_digest(supplied_norm, stored_norm)
def _is_within_return_window(delivered_date: str | None) -> tuple[bool, int | None]:
"""Return (within_window, days_since_delivery)."""
if delivered_date is None:
return (False, None)
delivered = date.fromisoformat(delivered_date)
days_since = (TODAY - delivered).days
return (days_since <= RETURN_POLICY["window_days"], days_since)
# ---------------------------------------------------------------------------
# TypedDict argument shapes
# ---------------------------------------------------------------------------
class LookupOrderArgs(TypedDict, total=False):
order_id: str
customer_email: NotRequired[str]
class CheckReturnEligibilityArgs(TypedDict):
order_id: str
customer_email: str
class InitiateReturnArgs(TypedDict, total=False):
order_id: str
customer_email: str
reason: str
item_titles: NotRequired[list[str]]
class LookupPolicyArgs(TypedDict):
topic: str
@dataclass
class SessionGuardState:
"""Per-session protocol state used to enforce tool ordering rules.
Sessions are short-lived chats, so plain in-memory sets are fine. A
production deployment would back this with a session store.
"""
eligibility_checks_passed: set[str] = field(default_factory=set)
returns_initiated: set[str] = field(default_factory=set)
# ---------------------------------------------------------------------------
# Tool schemas (Anthropic format)
# ---------------------------------------------------------------------------
LOOKUP_ORDER_SCHEMA: dict[str, Any] = {
"name": "lookup_order",
"description": (
"Look up the status and details of a Bookly order by order ID. "
"Optionally pass the customer email to verify ownership before returning details. "
"Use this whenever the customer asks about an order."
),
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "The order ID, formatted as 'BK-' followed by digits.",
},
"customer_email": {
"type": "string",
"description": "Optional email used to verify the customer owns the order.",
},
},
"required": ["order_id"],
},
}
CHECK_RETURN_ELIGIBILITY_SCHEMA: dict[str, Any] = {
"name": "check_return_eligibility",
"description": (
"Check whether an order is eligible for return. Requires both order ID and the email "
"on the order. Must be called and succeed before initiate_return."
),
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string"},
"customer_email": {"type": "string"},
},
"required": ["order_id", "customer_email"],
},
}
INITIATE_RETURN_SCHEMA: dict[str, Any] = {
"name": "initiate_return",
"description": (
"Start a return for an order. Only call this after check_return_eligibility has "
"succeeded for the same order in this conversation, and after the customer has "
"confirmed they want to proceed."
),
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string"},
"customer_email": {"type": "string"},
"reason": {
"type": "string",
"description": "The customer's stated reason for the return.",
},
"item_titles": {
"type": "array",
"items": {"type": "string"},
"description": "Optional list of specific item titles to return. Defaults to all items.",
},
},
"required": ["order_id", "customer_email", "reason"],
},
}
LOOKUP_POLICY_SCHEMA: dict[str, Any] = {
"name": "lookup_policy",
"description": (
"Look up a Bookly customer policy by topic. Use this whenever the customer asks "
"about shipping, password reset, returns overview, or similar standard policies. "
"Returns the verbatim policy text or topic_not_supported."
),
"input_schema": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Policy topic, e.g. 'shipping', 'password_reset', 'returns_overview'.",
},
},
"required": ["topic"],
},
# Cache breakpoint: marking the last tool with `cache_control` extends the
# prompt cache over the whole tools block so schemas are not re-tokenized
# on every turn. The big system prompt already has its own breakpoint.
"cache_control": {"type": "ephemeral"},
}
TOOL_SCHEMAS: list[dict[str, Any]] = [
LOOKUP_ORDER_SCHEMA,
CHECK_RETURN_ELIGIBILITY_SCHEMA,
INITIATE_RETURN_SCHEMA,
LOOKUP_POLICY_SCHEMA,
]
# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------
def handle_lookup_order(args: LookupOrderArgs, state: SessionGuardState) -> dict[str, Any]:
order_id = _require_order_id(args.get("order_id"))
customer_email = _optional_email(args.get("customer_email"))
order = ORDERS.get(order_id)
if order is None:
return {"error": "order_not_found", "message": f"No order found with ID {order_id}."}
# Privacy: when an email is supplied and does not match, return the same
# error as a missing order so callers cannot enumerate which IDs exist.
if customer_email is not None and not _emails_match(customer_email, order["email"]):
return {"error": "order_not_found", "message": f"No order found with ID {order_id}."}
return {"order": order}
def handle_check_return_eligibility(
args: CheckReturnEligibilityArgs, state: SessionGuardState
) -> dict[str, Any]:
order_id = _require_order_id(args.get("order_id"))
customer_email = _require_email(args.get("customer_email"))
order = ORDERS.get(order_id)
if order is None or not _emails_match(customer_email, order["email"]):
return {
"error": "auth_failed",
"message": "Could not verify that order ID and email together. Please double-check both.",
}
if order["status"] != "delivered":
return {
"eligible": False,
"reason": (
f"This order has status '{order['status']}', not 'delivered'. "
"Returns can only be started after an order has been delivered."
),
"policy": RETURN_POLICY,
}
within_window, days_since = _is_within_return_window(order.get("delivered_date"))
if not within_window:
return {
"eligible": False,
"reason": (
f"This order was delivered {days_since} days ago, which is outside the "
f"{RETURN_POLICY['window_days']}-day return window."
),
"policy": RETURN_POLICY,
}
state.eligibility_checks_passed.add(order_id)
return {
"eligible": True,
"reason": (
f"Order delivered {days_since} days ago, within the "
f"{RETURN_POLICY['window_days']}-day window."
),
"items": order["items"],
"policy": RETURN_POLICY,
}
def handle_initiate_return(args: InitiateReturnArgs, state: SessionGuardState) -> dict[str, Any]:
order_id = _require_order_id(args.get("order_id"))
customer_email = _require_email(args.get("customer_email"))
reason = _require_string(args.get("reason"), "reason", max_length=REASON_MAX_LENGTH)
item_titles = _optional_item_titles(args.get("item_titles"))
# Layer 3 protocol guard: the agent must have called check_return_eligibility
# for this exact order in this session, and it must have passed.
if order_id not in state.eligibility_checks_passed:
return {
"error": "eligibility_not_verified",
"message": (
"Cannot initiate a return without a successful eligibility check for this "
"order in the current session. Call check_return_eligibility first."
),
}
if order_id in state.returns_initiated:
return {
"error": "already_initiated",
"message": "A return has already been initiated for this order in this session.",
}
order = ORDERS.get(order_id)
# Paired assertion: we already checked eligibility against the same order,
# but re-verify here so a future edit that makes ORDERS mutable cannot
# silently break the email-binding guarantee.
if order is None or not _emails_match(customer_email, order["email"]):
return {"error": "auth_failed", "message": "Order/email mismatch."}
# Explicit: an empty list means "no items selected" (a caller error we
# reject) while `None` means "default to all items on the order".
if item_titles is not None and not item_titles:
return {"error": "no_items_selected", "message": "item_titles cannot be an empty list."}
titles = item_titles if item_titles is not None else [item["title"] for item in order["items"]]
return_id = f"RMA-{uuid.uuid4().hex[:8].upper()}"
record = {
"return_id": return_id,
"order_id": order_id,
"customer_email": order["email"],
"items": titles,
"reason": reason,
"refund_method": RETURN_POLICY["refund_method"],
"refund_timeline_days": RETURN_POLICY["refund_timeline_days"],
"next_steps": (
"We've emailed a prepaid shipping label to the address on file. Drop the package at "
"any carrier location within 14 days. Your refund will post within "
f"{RETURN_POLICY['refund_timeline_days']} business days of us receiving the return."
),
}
RETURNS[return_id] = record
state.returns_initiated.add(order_id)
return record
def handle_lookup_policy(args: LookupPolicyArgs, state: SessionGuardState) -> dict[str, Any]:
topic = _require_topic(args.get("topic"))
text = POLICIES.get(topic)
if text is None:
return {
"error": "topic_not_supported",
# Echo the normalized topic, not the raw input, so nothing the
# caller injected is ever reflected back into model context.
"message": f"No policy entry for topic '{topic}'.",
"available_topics": sorted(POLICIES.keys()),
}
return {"topic": topic, "text": text}
# ---------------------------------------------------------------------------
# Dispatch
# ---------------------------------------------------------------------------
_HANDLERS: dict[str, Callable[[Any, SessionGuardState], dict[str, Any]]] = {
"lookup_order": handle_lookup_order,
"check_return_eligibility": handle_check_return_eligibility,
"initiate_return": handle_initiate_return,
"lookup_policy": handle_lookup_policy,
}
def dispatch_tool(name: str, args: dict[str, Any], state: SessionGuardState) -> dict[str, Any]:
handler = _HANDLERS.get(name)
if handler is None:
return {"error": "unknown_tool", "message": f"No tool named {name}."}
if not isinstance(args, dict):
return {"error": "invalid_arguments", "message": "Tool arguments must be an object."}
try:
return handler(args, state)
except ToolValidationError as exc:
# Return validation errors as structured tool errors so the model can
# recover. Never surface the message verbatim from untrusted input --
# `_require_string` already stripped control characters, and the error
# messages themselves are constructed from field names, not user data.
return {"error": "invalid_arguments", "message": str(exc)}