"""Tool schemas, dispatch, and Layer 3 (tool-side) guardrail enforcement. Each tool has an Anthropic-format schema (used in the `tools` argument to `messages.create`) and a handler. Handlers are typed with `TypedDict`s so the contract between schema and handler is visible to the type checker; inputs are still validated at runtime because the caller is ultimately the model. The most important guardrail in the whole system lives here: `handle_initiate_return` refuses unless `check_return_eligibility` has already succeeded for the same order in the same session. This protects against the agent skipping the protocol even if the system prompt is ignored entirely. """ from __future__ import annotations import hmac import re import uuid from dataclasses import dataclass, field from datetime import date from typing import Any, Callable, TypedDict try: from typing import NotRequired # Python 3.11+ except ImportError: # pragma: no cover -- Python 3.10 fallback from typing_extensions import NotRequired # type: ignore[assignment] from mock_data import ORDERS, POLICIES, RETURN_POLICY, RETURNS, TODAY # --------------------------------------------------------------------------- # Validation helpers # --------------------------------------------------------------------------- # Validator limits. These are deliberately tight: tool arguments come from # model output, which in turn reflects user input, so anything that would not # plausibly appear in a real support conversation is rejected. ORDER_ID_RE = re.compile(r"^BK-\d{4,6}$") EMAIL_RE = re.compile(r"^[^@\s]{1,64}@[^@\s]{1,255}\.[^@\s]{1,10}$") TOPIC_RE = re.compile(r"^[a-z][a-z_]{0,39}$") ITEM_TITLE_MAX_LENGTH = 200 REASON_MAX_LENGTH = 500 ITEM_TITLES_MAX_COUNT = 50 # Control characters are stripped from any free-form input. Keeping them out # of tool payloads means they cannot end up in prompts on later turns, which # closes one prompt-injection surface. _CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]") class ToolValidationError(ValueError): """Raised when a tool argument fails validation. The dispatcher catches this and converts it into a tool-result error so the model can recover on its next turn instead of crashing the request. """ def _require_string(value: Any, field_name: str, *, max_length: int) -> str: if not isinstance(value, str): raise ToolValidationError(f"{field_name} must be a string") cleaned = _CONTROL_CHAR_RE.sub("", value).strip() if not cleaned: raise ToolValidationError(f"{field_name} is required") if len(cleaned) > max_length: raise ToolValidationError(f"{field_name} must be at most {max_length} characters") return cleaned def _require_order_id(value: Any) -> str: order_id = _require_string(value, "order_id", max_length=16) if not ORDER_ID_RE.match(order_id): raise ToolValidationError("order_id must match the format BK-NNNN") return order_id def _require_email(value: Any, *, field_name: str = "customer_email") -> str: email = _require_string(value, field_name, max_length=320) if not EMAIL_RE.match(email): raise ToolValidationError(f"{field_name} is not a valid email address") return email def _optional_email(value: Any, *, field_name: str = "customer_email") -> str | None: if value is None: return None return _require_email(value, field_name=field_name) def _require_topic(value: Any) -> str: topic = _require_string(value, "topic", max_length=40) topic = topic.lower() if not TOPIC_RE.match(topic): raise ToolValidationError("topic must be lowercase letters and underscores only") return topic def _optional_item_titles(value: Any) -> list[str] | None: if value is None: return None if not isinstance(value, list): raise ToolValidationError("item_titles must be a list of strings") if len(value) > ITEM_TITLES_MAX_COUNT: raise ToolValidationError(f"item_titles may contain at most {ITEM_TITLES_MAX_COUNT} entries") cleaned: list[str] = [] for index, entry in enumerate(value): cleaned.append(_require_string(entry, f"item_titles[{index}]", max_length=ITEM_TITLE_MAX_LENGTH)) return cleaned def _emails_match(supplied: str | None, stored: str | None) -> bool: """Constant-time email comparison with normalization. Returns False if either side is missing. Uses `hmac.compare_digest` to close the timing side-channel that would otherwise leak the correct prefix of a stored email. """ if supplied is None or stored is None: return False supplied_norm = supplied.strip().lower().encode("utf-8") stored_norm = stored.strip().lower().encode("utf-8") return hmac.compare_digest(supplied_norm, stored_norm) def _is_within_return_window(delivered_date: str | None) -> tuple[bool, int | None]: """Return (within_window, days_since_delivery).""" if delivered_date is None: return (False, None) delivered = date.fromisoformat(delivered_date) days_since = (TODAY - delivered).days return (days_since <= RETURN_POLICY["window_days"], days_since) # --------------------------------------------------------------------------- # TypedDict argument shapes # --------------------------------------------------------------------------- class LookupOrderArgs(TypedDict, total=False): order_id: str customer_email: NotRequired[str] class CheckReturnEligibilityArgs(TypedDict): order_id: str customer_email: str class InitiateReturnArgs(TypedDict, total=False): order_id: str customer_email: str reason: str item_titles: NotRequired[list[str]] class LookupPolicyArgs(TypedDict): topic: str @dataclass class SessionGuardState: """Per-session protocol state used to enforce tool ordering rules. Sessions are short-lived chats, so plain in-memory sets are fine. A production deployment would back this with a session store. """ eligibility_checks_passed: set[str] = field(default_factory=set) returns_initiated: set[str] = field(default_factory=set) # --------------------------------------------------------------------------- # Tool schemas (Anthropic format) # --------------------------------------------------------------------------- LOOKUP_ORDER_SCHEMA: dict[str, Any] = { "name": "lookup_order", "description": ( "Look up the status and details of a Bookly order by order ID. " "Optionally pass the customer email to verify ownership before returning details. " "Use this whenever the customer asks about an order." ), "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "The order ID, formatted as 'BK-' followed by digits.", }, "customer_email": { "type": "string", "description": "Optional email used to verify the customer owns the order.", }, }, "required": ["order_id"], }, } CHECK_RETURN_ELIGIBILITY_SCHEMA: dict[str, Any] = { "name": "check_return_eligibility", "description": ( "Check whether an order is eligible for return. Requires both order ID and the email " "on the order. Must be called and succeed before initiate_return." ), "input_schema": { "type": "object", "properties": { "order_id": {"type": "string"}, "customer_email": {"type": "string"}, }, "required": ["order_id", "customer_email"], }, } INITIATE_RETURN_SCHEMA: dict[str, Any] = { "name": "initiate_return", "description": ( "Start a return for an order. Only call this after check_return_eligibility has " "succeeded for the same order in this conversation, and after the customer has " "confirmed they want to proceed." ), "input_schema": { "type": "object", "properties": { "order_id": {"type": "string"}, "customer_email": {"type": "string"}, "reason": { "type": "string", "description": "The customer's stated reason for the return.", }, "item_titles": { "type": "array", "items": {"type": "string"}, "description": "Optional list of specific item titles to return. Defaults to all items.", }, }, "required": ["order_id", "customer_email", "reason"], }, } LOOKUP_POLICY_SCHEMA: dict[str, Any] = { "name": "lookup_policy", "description": ( "Look up a Bookly customer policy by topic. Use this whenever the customer asks " "about shipping, password reset, returns overview, or similar standard policies. " "Returns the verbatim policy text or topic_not_supported." ), "input_schema": { "type": "object", "properties": { "topic": { "type": "string", "description": "Policy topic, e.g. 'shipping', 'password_reset', 'returns_overview'.", }, }, "required": ["topic"], }, # Cache breakpoint: marking the last tool with `cache_control` extends the # prompt cache over the whole tools block so schemas are not re-tokenized # on every turn. The big system prompt already has its own breakpoint. "cache_control": {"type": "ephemeral"}, } TOOL_SCHEMAS: list[dict[str, Any]] = [ LOOKUP_ORDER_SCHEMA, CHECK_RETURN_ELIGIBILITY_SCHEMA, INITIATE_RETURN_SCHEMA, LOOKUP_POLICY_SCHEMA, ] # --------------------------------------------------------------------------- # Handlers # --------------------------------------------------------------------------- def handle_lookup_order(args: LookupOrderArgs, state: SessionGuardState) -> dict[str, Any]: order_id = _require_order_id(args.get("order_id")) customer_email = _optional_email(args.get("customer_email")) order = ORDERS.get(order_id) if order is None: return {"error": "order_not_found", "message": f"No order found with ID {order_id}."} # Privacy: when an email is supplied and does not match, return the same # error as a missing order so callers cannot enumerate which IDs exist. if customer_email is not None and not _emails_match(customer_email, order["email"]): return {"error": "order_not_found", "message": f"No order found with ID {order_id}."} return {"order": order} def handle_check_return_eligibility( args: CheckReturnEligibilityArgs, state: SessionGuardState ) -> dict[str, Any]: order_id = _require_order_id(args.get("order_id")) customer_email = _require_email(args.get("customer_email")) order = ORDERS.get(order_id) if order is None or not _emails_match(customer_email, order["email"]): return { "error": "auth_failed", "message": "Could not verify that order ID and email together. Please double-check both.", } if order["status"] != "delivered": return { "eligible": False, "reason": ( f"This order has status '{order['status']}', not 'delivered'. " "Returns can only be started after an order has been delivered." ), "policy": RETURN_POLICY, } within_window, days_since = _is_within_return_window(order.get("delivered_date")) if not within_window: return { "eligible": False, "reason": ( f"This order was delivered {days_since} days ago, which is outside the " f"{RETURN_POLICY['window_days']}-day return window." ), "policy": RETURN_POLICY, } state.eligibility_checks_passed.add(order_id) return { "eligible": True, "reason": ( f"Order delivered {days_since} days ago, within the " f"{RETURN_POLICY['window_days']}-day window." ), "items": order["items"], "policy": RETURN_POLICY, } def handle_initiate_return(args: InitiateReturnArgs, state: SessionGuardState) -> dict[str, Any]: order_id = _require_order_id(args.get("order_id")) customer_email = _require_email(args.get("customer_email")) reason = _require_string(args.get("reason"), "reason", max_length=REASON_MAX_LENGTH) item_titles = _optional_item_titles(args.get("item_titles")) # Layer 3 protocol guard: the agent must have called check_return_eligibility # for this exact order in this session, and it must have passed. if order_id not in state.eligibility_checks_passed: return { "error": "eligibility_not_verified", "message": ( "Cannot initiate a return without a successful eligibility check for this " "order in the current session. Call check_return_eligibility first." ), } if order_id in state.returns_initiated: return { "error": "already_initiated", "message": "A return has already been initiated for this order in this session.", } order = ORDERS.get(order_id) # Paired assertion: we already checked eligibility against the same order, # but re-verify here so a future edit that makes ORDERS mutable cannot # silently break the email-binding guarantee. if order is None or not _emails_match(customer_email, order["email"]): return {"error": "auth_failed", "message": "Order/email mismatch."} # Explicit: an empty list means "no items selected" (a caller error we # reject) while `None` means "default to all items on the order". if item_titles is not None and not item_titles: return {"error": "no_items_selected", "message": "item_titles cannot be an empty list."} titles = item_titles if item_titles is not None else [item["title"] for item in order["items"]] return_id = f"RMA-{uuid.uuid4().hex[:8].upper()}" record = { "return_id": return_id, "order_id": order_id, "customer_email": order["email"], "items": titles, "reason": reason, "refund_method": RETURN_POLICY["refund_method"], "refund_timeline_days": RETURN_POLICY["refund_timeline_days"], "next_steps": ( "We've emailed a prepaid shipping label to the address on file. Drop the package at " "any carrier location within 14 days. Your refund will post within " f"{RETURN_POLICY['refund_timeline_days']} business days of us receiving the return." ), } RETURNS[return_id] = record state.returns_initiated.add(order_id) return record def handle_lookup_policy(args: LookupPolicyArgs, state: SessionGuardState) -> dict[str, Any]: topic = _require_topic(args.get("topic")) text = POLICIES.get(topic) if text is None: return { "error": "topic_not_supported", # Echo the normalized topic, not the raw input, so nothing the # caller injected is ever reflected back into model context. "message": f"No policy entry for topic '{topic}'.", "available_topics": sorted(POLICIES.keys()), } return {"topic": topic, "text": text} # --------------------------------------------------------------------------- # Dispatch # --------------------------------------------------------------------------- _HANDLERS: dict[str, Callable[[Any, SessionGuardState], dict[str, Any]]] = { "lookup_order": handle_lookup_order, "check_return_eligibility": handle_check_return_eligibility, "initiate_return": handle_initiate_return, "lookup_policy": handle_lookup_policy, } def dispatch_tool(name: str, args: dict[str, Any], state: SessionGuardState) -> dict[str, Any]: handler = _HANDLERS.get(name) if handler is None: return {"error": "unknown_tool", "message": f"No tool named {name}."} if not isinstance(args, dict): return {"error": "invalid_arguments", "message": "Tool arguments must be an object."} try: return handler(args, state) except ToolValidationError as exc: # Return validation errors as structured tool errors so the model can # recover. Never surface the message verbatim from untrusted input -- # `_require_string` already stripped control characters, and the error # messages themselves are constructed from field names, not user data. return {"error": "invalid_arguments", "message": str(exc)}