"""Tool schemas, dispatch, and Layer 3 (tool-side) guardrail enforcement. Each tool has an Anthropic-format schema (used in the `tools` argument to `messages.create`) and a handler. Handlers are pure functions of (args, state), so they are trivial to unit test and the only mutable state lives in `SessionGuardState` and the module-level `RETURNS` dict. The most important guardrail in the whole system lives here: `handle_initiate_return` refuses unless `check_return_eligibility` has already succeeded for the same order in the same session. This protects against the agent skipping the protocol even if the system prompt is ignored entirely. """ from __future__ import annotations import uuid from dataclasses import dataclass, field from datetime import date from typing import Any, Callable from mock_data import ORDERS, POLICIES, RETURN_POLICY, RETURNS, TODAY @dataclass class SessionGuardState: """Per-session protocol state used to enforce tool ordering rules. Sessions are short-lived chats, so plain in-memory sets are fine. A production deployment would back this with a session store. """ eligibility_checks_passed: set[str] = field(default_factory=set) returns_initiated: set[str] = field(default_factory=set) # --------------------------------------------------------------------------- # Tool schemas (Anthropic format) # --------------------------------------------------------------------------- TOOL_SCHEMAS: list[dict[str, Any]] = [ { "name": "lookup_order", "description": ( "Look up the status and details of a Bookly order by order ID. " "Optionally pass the customer email to verify ownership before returning details. " "Use this whenever the customer asks about an order." ), "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "The order ID, formatted as 'BK-' followed by digits.", }, "customer_email": { "type": "string", "description": "Optional email used to verify the customer owns the order.", }, }, "required": ["order_id"], }, }, { "name": "check_return_eligibility", "description": ( "Check whether an order is eligible for return. Requires both order ID and the email " "on the order. Must be called and succeed before initiate_return." ), "input_schema": { "type": "object", "properties": { "order_id": {"type": "string"}, "customer_email": {"type": "string"}, }, "required": ["order_id", "customer_email"], }, }, { "name": "initiate_return", "description": ( "Start a return for an order. Only call this after check_return_eligibility has " "succeeded for the same order in this conversation, and after the customer has " "confirmed they want to proceed." ), "input_schema": { "type": "object", "properties": { "order_id": {"type": "string"}, "customer_email": {"type": "string"}, "reason": { "type": "string", "description": "The customer's stated reason for the return.", }, "item_titles": { "type": "array", "items": {"type": "string"}, "description": "Optional list of specific item titles to return. Defaults to all items.", }, }, "required": ["order_id", "customer_email", "reason"], }, }, { "name": "lookup_policy", "description": ( "Look up a Bookly customer policy by topic. Use this whenever the customer asks " "about shipping, password reset, returns overview, or similar standard policies. " "Returns the verbatim policy text or topic_not_supported." ), "input_schema": { "type": "object", "properties": { "topic": { "type": "string", "description": "Policy topic, e.g. 'shipping', 'password_reset', 'returns_overview'.", }, }, "required": ["topic"], }, }, ] # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _emails_match(a: str | None, b: str | None) -> bool: if a is None or b is None: return False return a.strip().lower() == b.strip().lower() def _is_within_return_window(delivered_date: str | None) -> tuple[bool, int | None]: """Return (within_window, days_since_delivery).""" if delivered_date is None: return (False, None) delivered = date.fromisoformat(delivered_date) days_since = (TODAY - delivered).days return (days_since <= RETURN_POLICY["window_days"], days_since) # --------------------------------------------------------------------------- # Handlers # --------------------------------------------------------------------------- def handle_lookup_order(args: dict, state: SessionGuardState) -> dict: order_id = args.get("order_id") customer_email = args.get("customer_email") assert isinstance(order_id, str) and order_id, "order_id is required" order = ORDERS.get(order_id) if order is None: return {"error": "order_not_found", "message": f"No order found with ID {order_id}."} # Privacy: when an email is supplied and does not match, return the same # error as a missing order so callers cannot enumerate which IDs exist. if customer_email is not None and not _emails_match(customer_email, order["email"]): return {"error": "order_not_found", "message": f"No order found with ID {order_id}."} return {"order": order} def handle_check_return_eligibility(args: dict, state: SessionGuardState) -> dict: order_id = args.get("order_id") customer_email = args.get("customer_email") assert isinstance(order_id, str) and order_id, "order_id is required" assert isinstance(customer_email, str) and customer_email, "customer_email is required" order = ORDERS.get(order_id) if order is None or not _emails_match(customer_email, order["email"]): return { "error": "auth_failed", "message": "Could not verify that order ID and email together. Please double-check both.", } if order["status"] != "delivered": return { "eligible": False, "reason": ( f"This order has status '{order['status']}', not 'delivered'. " "Returns can only be started after an order has been delivered." ), "policy": RETURN_POLICY, } within_window, days_since = _is_within_return_window(order.get("delivered_date")) if not within_window: return { "eligible": False, "reason": ( f"This order was delivered {days_since} days ago, which is outside the " f"{RETURN_POLICY['window_days']}-day return window." ), "policy": RETURN_POLICY, } state.eligibility_checks_passed.add(order_id) return { "eligible": True, "reason": ( f"Order delivered {days_since} days ago, within the " f"{RETURN_POLICY['window_days']}-day window." ), "items": order["items"], "policy": RETURN_POLICY, } def handle_initiate_return(args: dict, state: SessionGuardState) -> dict: order_id = args.get("order_id") customer_email = args.get("customer_email") reason = args.get("reason") item_titles = args.get("item_titles") assert isinstance(order_id, str) and order_id, "order_id is required" assert isinstance(customer_email, str) and customer_email, "customer_email is required" assert isinstance(reason, str) and reason, "reason is required" # Layer 3 protocol guard: the agent must have called check_return_eligibility # for this exact order in this session, and it must have passed. if order_id not in state.eligibility_checks_passed: return { "error": "eligibility_not_verified", "message": ( "Cannot initiate a return without a successful eligibility check for this " "order in the current session. Call check_return_eligibility first." ), } if order_id in state.returns_initiated: return { "error": "already_initiated", "message": "A return has already been initiated for this order in this session.", } order = ORDERS.get(order_id) # If the order disappeared between eligibility check and now, fail loudly. if order is None or not _emails_match(customer_email, order["email"]): return {"error": "auth_failed", "message": "Order/email mismatch."} titles = item_titles or [item["title"] for item in order["items"]] return_id = f"RMA-{uuid.uuid4().hex[:8].upper()}" record = { "return_id": return_id, "order_id": order_id, "customer_email": order["email"], "items": titles, "reason": reason, "refund_method": RETURN_POLICY["refund_method"], "refund_timeline_days": RETURN_POLICY["refund_timeline_days"], "next_steps": ( "We've emailed a prepaid shipping label to the address on file. Drop the package at " "any carrier location within 14 days. Your refund will post within " f"{RETURN_POLICY['refund_timeline_days']} business days of us receiving the return." ), } RETURNS[return_id] = record state.returns_initiated.add(order_id) return record def handle_lookup_policy(args: dict, state: SessionGuardState) -> dict: topic = args.get("topic") assert isinstance(topic, str) and topic, "topic is required" text = POLICIES.get(topic.strip().lower()) if text is None: return { "error": "topic_not_supported", "message": f"No policy entry for topic '{topic}'.", "available_topics": sorted(POLICIES.keys()), } return {"topic": topic, "text": text} # --------------------------------------------------------------------------- # Dispatch # --------------------------------------------------------------------------- _HANDLERS: dict[str, Callable[[dict, SessionGuardState], dict]] = { "lookup_order": handle_lookup_order, "check_return_eligibility": handle_check_return_eligibility, "initiate_return": handle_initiate_return, "lookup_policy": handle_lookup_policy, } def dispatch_tool(name: str, args: dict, state: SessionGuardState) -> dict: handler = _HANDLERS.get(name) if handler is None: return {"error": "unknown_tool", "message": f"No tool named {name}."} return handler(args, state)