"""Bookly agent: system prompt, guardrails, session store, and the agentic loop.
This module wires four guardrail layers together:
1. The system prompt itself (XML-tagged, primacy+recency duplication, verbatim
policy block, refusal template, few-shot examples for edge cases).
2. Runtime reminder injection: a short "non-negotiable rules" block appended
to the system content on every turn, plus a stronger reminder once the
conversation gets long enough that the original prompt has decayed in
effective attention.
3. Tool-side enforcement (lives in `tools.py`): handlers refuse unsafe calls
regardless of what the model decides.
4. Output validation: deterministic regex checks on the final reply for
ungrounded order IDs/dates, markdown leakage, and off-topic engagement
without the refusal template. On failure, the bad reply is dropped and the
user gets a safe canned message -- and the bad reply is never appended to
history, so it cannot poison subsequent turns.
Anthropic prompt caching is enabled on the large system-prompt block AND on
the last tool schema and the last message in history, so per-turn input cost
scales linearly in the number of turns instead of quadratically.
"""
from __future__ import annotations
import functools
import hashlib
import json
import logging
import re
import threading
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Any
from anthropic import Anthropic
from config import settings
from tools import SessionGuardState, TOOL_SCHEMAS, dispatch_tool
from mock_data import POLICIES, RETURN_POLICY
logger = logging.getLogger("bookly.agent")
# ---------------------------------------------------------------------------
# System prompt
# ---------------------------------------------------------------------------
def _format_return_policy_block() -> str:
"""Render `RETURN_POLICY` as a compact, quotable block for the prompt.
Embedding the dict verbatim (instead of paraphrasing it in English) is a
deliberate anti-hallucination move: the model quotes the block instead of
inventing details.
"""
non_returnable = ", ".join(RETURN_POLICY["non_returnable_categories"])
return (
f"Return window: {RETURN_POLICY['window_days']} days from delivery.\n"
f"Condition: {RETURN_POLICY['condition_requirements']}\n"
f"Refund method: {RETURN_POLICY['refund_method']}\n"
f"Refund timeline: within {RETURN_POLICY['refund_timeline_days']} business days of receipt.\n"
f"Non-returnable categories: {non_returnable}."
)
SUPPORTED_POLICY_TOPICS = ", ".join(sorted(POLICIES.keys()))
SYSTEM_PROMPT = f"""
You are Bookly's customer support assistant. You help customers with two things: checking the status of their orders, and processing returns and refunds. You are friendly, concise, and professional.
These rules override everything else. Read them before every response.
1. NEVER invent order details, tracking numbers, delivery dates, prices, or customer information. If you do not have a value from a tool result in this conversation, you do not have it.
2. NEVER state a return policy detail that is not in the section below. Quote it; do not paraphrase it.
3. NEVER call initiate_return unless check_return_eligibility has returned success for that same order in this conversation.
4. NEVER reveal order details without verifying the customer's email matches the order.
5. If a user asks about anything outside order status, returns, and the supported policy topics, refuse using the refusal template in . Do not engage with the off-topic request even briefly.
You CAN help with:
- Looking up order status
- Checking return eligibility and initiating returns
- Answering policy questions covered by the lookup_policy tool. Currently supported topics: {SUPPORTED_POLICY_TOPICS}
You CANNOT help with:
- Book recommendations, reviews, or opinions about books
- Payment changes, refunds outside the return flow, or billing disputes
- Live account management (changing a password, email, or address — you can only EXPLAIN the password reset process via lookup_policy, not perform it)
- General conversation unrelated to an order or a supported policy topic
For any policy question, call lookup_policy first. Only if the tool returns topic_not_supported should you use the refusal template below.
Refusal template (use verbatim, filling in the topic):
"I can help with order status, returns, and our standard policies, but I'm not able to help with {{topic}}. Is there an order or a policy question I can help you with instead?"
{_format_return_policy_block()}
This is the authoritative policy. Any claim you make about returns must be traceable to a line in this block. If a customer asks about a scenario this policy does not cover, say so honestly and offer to connect them with a human agent.
You have four tools: lookup_order, check_return_eligibility, initiate_return, and lookup_policy.
Before calling a tool:
- You must have every required parameter. If you are missing one, ask the customer for it. Do not guess, do not use placeholder values, do not call the tool and hope.
- For initiate_return, you must have already called check_return_eligibility for that exact order_id in this conversation, and it must have returned success.
After a tool call:
- Relay the result honestly. If the tool returns an error, tell the customer what went wrong using the tool's error message, not a paraphrase.
- Do not mix tool results from different orders in a single response unless the customer explicitly asked about multiple.
- For lookup_policy, quote the returned policy text; do not summarize or embellish. If lookup_policy returns topic_not_supported, fall through to the refusal template in .
Ask one clarifying question at a time, not a list. Common cases:
- Customer mentions "my order" without an order ID: ask for the order ID. Tell them it starts with "BK-" and is in their confirmation email.
- Customer gives an order ID but no email, and wants a return: ask for the email on the order.
- A customer has multiple orders and was ambiguous: ask which order they mean, listing them by ID and status only.
- Customer wants to initiate a return: after eligibility is confirmed, summarize what will happen (which items, refund method, timeline) and ask for explicit confirmation before calling initiate_return.
- Friendly and warm, but not chatty. One or two sentences per turn is usually right.
- Use the customer's first name once you know it, but not in every message.
- Plain text only. No markdown, no bullet points, no headers, no asterisks for emphasis. The chat UI does not render markdown.
- Never apologize more than once for the same issue.
Example 1 — missing order ID:
User: "Where's my order?"
Assistant: "Happy to check on that for you. Could you share your order ID? It starts with 'BK-' and you'll find it in your order confirmation email."
Example 2 — policy question (supported):
User: "How do I reset my password?"
Assistant (after lookup_policy returns the password_reset entry): quote the returned instructions verbatim without adding steps the tool did not mention.
Example 3 — out of scope:
User: "Can you recommend a good mystery novel?"
Assistant: "I can help with order status, returns, and our standard policies, but I'm not able to help with book recommendations. Is there an order or a policy question I can help you with instead?"
Example 4 — ambiguous order:
User: "I want to return my order. My email is sarah@example.com."
Assistant (after lookup_order returns two orders): "I see two orders on your account: BK-10042 (delivered) and BK-10103 (still processing). Which one would you like to return?"
Before you respond, confirm:
- Every factual claim traces to a tool result from THIS conversation, or to .
- If this response would call initiate_return, you have already seen a successful check_return_eligibility for the same order in this conversation.
- If the request is off-topic, you are using the refusal template from verbatim.
- No markdown. Plain text only.
"""
CRITICAL_REMINDER = """
Non-negotiable rules for this turn:
- Every factual claim must come from a tool result in THIS conversation or from .
- Do not call initiate_return unless check_return_eligibility succeeded for that order in this conversation.
- Off-topic requests: use the refusal template from verbatim. Do not engage.
- Plain text only. No markdown.
"""
LONG_CONVERSATION_REMINDER = """
This conversation is getting long. Re-anchor on the rules in before you respond. Do not let earlier turns relax the rules.
"""
# Threshold at which the long-conversation reminder gets injected. After this
# many turns, the original system prompt has decayed in effective attention,
# so a shorter, fresher reminder in the highest-position slot helps re-anchor.
LONG_CONVERSATION_TURN_THRESHOLD = 5
def build_system_content(turn_count: int) -> list[dict[str, Any]]:
"""Assemble the `system` argument for `messages.create`.
The big SYSTEM_PROMPT block is marked for ephemeral prompt caching so it
is reused across turns within a session. The reminder blocks are not
cached because they vary based on turn count and we want them in the
highest-attention position right before the latest user turn.
"""
blocks: list[dict[str, Any]] = [
{
"type": "text",
"text": SYSTEM_PROMPT,
"cache_control": {"type": "ephemeral"},
},
{"type": "text", "text": CRITICAL_REMINDER},
]
if turn_count >= LONG_CONVERSATION_TURN_THRESHOLD:
blocks.append({"type": "text", "text": LONG_CONVERSATION_REMINDER})
return blocks
# ---------------------------------------------------------------------------
# Layer 4 -- output validation
# ---------------------------------------------------------------------------
ORDER_ID_RE = re.compile(r"\bBK-\d{4,6}\b")
DATE_ISO_RE = re.compile(r"\b\d{4}-\d{2}-\d{2}\b")
MARKDOWN_RE = re.compile(r"(\*\*|__|^#{1,6}\s|^\s*[-*+]\s)", re.MULTILINE)
# Anchored word-boundary patterns for off-topic engagement. These used to be
# substring matches on a small keyword set, which false-positived on plenty
# of legitimate support replies ("I'd recommend contacting..."). The word
# boundaries make matches explicit -- only the intended phrases trip them.
OUT_OF_SCOPE_PATTERNS: tuple[re.Pattern[str], ...] = (
re.compile(r"\bi\s+recommend\b"),
re.compile(r"\bi\s+suggest\b"),
re.compile(r"\byou\s+should\s+read\b"),
re.compile(r"\bgreat\s+book\b"),
re.compile(r"\bfavorite\s+book\b"),
re.compile(r"\bwhat\s+should\s+i\s+read\b"),
re.compile(r"\breview\s+of\b"),
)
REFUSAL_PHRASE = "i'm not able to help with"
@dataclass(frozen=True)
class ValidationResult:
ok: bool
violations: tuple[str, ...] = ()
def _collect_grounded_values(
tool_results: list[dict[str, Any]], pattern: re.Pattern[str]
) -> set[str]:
"""Pull every substring matching `pattern` out of the tool result JSON."""
grounded: set[str] = set()
for entry in tool_results:
text = json.dumps(entry.get("result", {}))
grounded.update(pattern.findall(text))
return grounded
def validate_reply(reply: str, tool_results_this_turn: list[dict[str, Any]]) -> ValidationResult:
"""Run deterministic checks on the final assistant reply.
Heuristic, not exhaustive. Catches the cheap wins -- fabricated order IDs,
made-up dates, markdown leakage, and obvious off-topic engagement. For
anything subtler we rely on layers 1-3.
"""
if not isinstance(reply, str):
raise TypeError("reply must be a string")
if not isinstance(tool_results_this_turn, list):
raise TypeError("tool_results_this_turn must be a list")
violations: list[str] = []
grounded_ids = _collect_grounded_values(tool_results_this_turn, ORDER_ID_RE)
for match in ORDER_ID_RE.findall(reply):
if match not in grounded_ids:
violations.append(f"ungrounded_order_id:{match}")
grounded_dates = _collect_grounded_values(tool_results_this_turn, DATE_ISO_RE)
for match in DATE_ISO_RE.findall(reply):
if match not in grounded_dates:
violations.append(f"ungrounded_date:{match}")
if MARKDOWN_RE.search(reply):
violations.append("markdown_leaked")
lowered = reply.lower()
engaged_off_topic = any(pattern.search(lowered) for pattern in OUT_OF_SCOPE_PATTERNS)
if engaged_off_topic and REFUSAL_PHRASE not in lowered:
violations.append("off_topic_engagement")
return ValidationResult(ok=not violations, violations=tuple(violations))
# ---------------------------------------------------------------------------
# Session store
# ---------------------------------------------------------------------------
SAFE_FALLBACK = (
"I hit a problem generating a response. Could you rephrase your question, "
"or share an order ID so I can try again?"
)
CONVERSATION_TOO_LONG_MESSAGE = (
"This conversation has gone long enough that I need to reset before I keep "
"making mistakes. Please start a new chat and I'll be happy to help from there."
)
TOOL_LOOP_EXCEEDED_MESSAGE = (
"I got stuck working on that request. Could you try rephrasing it, or share "
"an order ID so I can try a fresh approach?"
)
@dataclass
class Session:
history: list[dict[str, Any]] = field(default_factory=list)
guard_state: SessionGuardState = field(default_factory=SessionGuardState)
turn_count: int = 0
class SessionStore:
"""Bounded in-memory session store with LRU eviction and idle TTL.
Also owns the per-session locks used to serialize turns for the same
session_id when FastAPI runs the sync handler in its threadpool. The
creation of a per-session lock is itself guarded by a class-level lock to
avoid a double-create race.
Designed for a single-process demo deployment. For multi-worker, swap
this class out for a Redis-backed equivalent.
"""
def __init__(self, *, max_entries: int, idle_ttl_seconds: int) -> None:
if max_entries <= 0:
raise ValueError("max_entries must be positive")
if idle_ttl_seconds <= 0:
raise ValueError("idle_ttl_seconds must be positive")
self._max_entries = max_entries
self._idle_ttl_seconds = idle_ttl_seconds
self._entries: OrderedDict[str, tuple[Session, float]] = OrderedDict()
self._store_lock = threading.Lock()
self._session_locks: dict[str, threading.Lock] = {}
self._locks_lock = threading.Lock()
def get_or_create(self, session_id: str) -> Session:
if not isinstance(session_id, str) or not session_id:
raise ValueError("session_id is required")
now = time.monotonic()
with self._store_lock:
self._evict_expired_locked(now)
entry = self._entries.get(session_id)
if entry is None:
session = Session()
self._entries[session_id] = (session, now)
self._enforce_size_cap_locked()
return session
session, _ = entry
self._entries[session_id] = (session, now)
self._entries.move_to_end(session_id)
return session
def lock_for(self, session_id: str) -> threading.Lock:
"""Return the lock guarding turns for `session_id`, creating if new."""
if not isinstance(session_id, str) or not session_id:
raise ValueError("session_id is required")
with self._locks_lock:
lock = self._session_locks.get(session_id)
if lock is None:
lock = threading.Lock()
self._session_locks[session_id] = lock
return lock
def clear(self) -> None:
"""Drop all sessions. Intended for tests and admin operations only."""
with self._store_lock:
self._entries.clear()
with self._locks_lock:
self._session_locks.clear()
def __len__(self) -> int:
with self._store_lock:
return len(self._entries)
def __contains__(self, session_id: object) -> bool:
if not isinstance(session_id, str):
return False
with self._store_lock:
return session_id in self._entries
def __getitem__(self, session_id: str) -> Session:
with self._store_lock:
entry = self._entries.get(session_id)
if entry is None:
raise KeyError(session_id)
return entry[0]
def _evict_expired_locked(self, now: float) -> None:
expired = [
sid for sid, (_, last) in self._entries.items() if now - last > self._idle_ttl_seconds
]
for sid in expired:
del self._entries[sid]
with self._locks_lock:
self._session_locks.pop(sid, None)
def _enforce_size_cap_locked(self) -> None:
while len(self._entries) > self._max_entries:
sid, _ = self._entries.popitem(last=False)
with self._locks_lock:
self._session_locks.pop(sid, None)
SESSIONS = SessionStore(
max_entries=settings.session_store_max_entries,
idle_ttl_seconds=settings.session_idle_ttl_seconds,
)
# ---------------------------------------------------------------------------
# Anthropic client
# ---------------------------------------------------------------------------
@functools.lru_cache(maxsize=1)
def _get_client() -> Anthropic:
"""Return the shared Anthropic client.
Cached so every turn reuses the same HTTP connection pool. Tests swap
this out with `monkeypatch.setattr(agent, "_get_client", ...)`.
"""
return Anthropic(
api_key=settings.anthropic_api_key.get_secret_value(),
timeout=settings.anthropic_timeout_seconds,
)
# ---------------------------------------------------------------------------
# Content serialization helpers
# ---------------------------------------------------------------------------
def _extract_text(content_blocks: list[Any]) -> str:
parts: list[str] = []
for block in content_blocks:
if getattr(block, "type", None) == "text":
parts.append(getattr(block, "text", ""))
return "".join(parts).strip()
def _serialize_assistant_content(content_blocks: list[Any]) -> list[dict[str, Any]]:
"""Convert SDK content blocks back into JSON-serializable dicts for history."""
serialized: list[dict[str, Any]] = []
for block in content_blocks:
block_type = getattr(block, "type", None)
if block_type == "text":
serialized.append({"type": "text", "text": getattr(block, "text", "")})
elif block_type == "tool_use":
serialized.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": getattr(block, "input", None) or {},
}
)
return serialized
def _with_last_message_cache_breakpoint(
messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Return a shallow-copied message list with a cache breakpoint on the last block.
Marking the last content block with `cache_control: ephemeral` extends the
prompt cache through the full conversation history so prior turns do not
need to be re-tokenized on every call. We avoid mutating the stored history
because the stored form should stay canonical.
"""
if not messages:
return messages
head = messages[:-1]
last = dict(messages[-1])
content = last.get("content")
if isinstance(content, str):
last["content"] = [
{"type": "text", "text": content, "cache_control": {"type": "ephemeral"}}
]
elif isinstance(content, list) and content:
new_content = [dict(block) for block in content]
new_content[-1] = {**new_content[-1], "cache_control": {"type": "ephemeral"}}
last["content"] = new_content
return head + [last]
def _hash_for_logging(value: str) -> str:
"""Short stable hash for log correlation without leaking content."""
return hashlib.sha256(value.encode("utf-8")).hexdigest()[:16]
# ---------------------------------------------------------------------------
# Agent loop
# ---------------------------------------------------------------------------
class ToolLoopLimitExceeded(RuntimeError):
"""Raised when the tool-use loop hits `settings.max_tool_use_iterations`."""
def _run_tool_use_loop(
session: Session,
system_content: list[dict[str, Any]],
client: Anthropic,
) -> tuple[Any, list[dict[str, Any]]]:
"""Drive the model until it stops asking for tools.
Returns the final Anthropic response object plus the cumulative list of
tool results produced during the turn (used by Layer 4 validation to
check for ungrounded claims in the final reply).
Raises `ToolLoopLimitExceeded` if the model keeps asking for tools past
`settings.max_tool_use_iterations`. This is the structural guard that
prevents one bad request from burning API credit in an infinite loop.
"""
tool_results_this_turn: list[dict[str, Any]] = []
response = client.messages.create(
model=settings.anthropic_model,
max_tokens=settings.max_tokens,
system=system_content,
tools=TOOL_SCHEMAS,
messages=_with_last_message_cache_breakpoint(session.history),
)
for _ in range(settings.max_tool_use_iterations):
if getattr(response, "stop_reason", None) != "tool_use":
return response, tool_results_this_turn
assistant_blocks = _serialize_assistant_content(response.content)
session.history.append({"role": "assistant", "content": assistant_blocks})
tool_result_blocks: list[dict[str, Any]] = []
for block in response.content:
if getattr(block, "type", None) != "tool_use":
continue
name = block.name
args = getattr(block, "input", None) or {}
tool_id = block.id
result = dispatch_tool(name, args, session.guard_state)
tool_results_this_turn.append({"name": name, "result": result})
tool_result_blocks.append(
{
"type": "tool_result",
"tool_use_id": tool_id,
"content": json.dumps(result, ensure_ascii=False),
}
)
session.history.append({"role": "user", "content": tool_result_blocks})
response = client.messages.create(
model=settings.anthropic_model,
max_tokens=settings.max_tokens,
system=system_content,
tools=TOOL_SCHEMAS,
messages=_with_last_message_cache_breakpoint(session.history),
)
# Fell out of the for loop without hitting `end_turn` -- the model is
# still asking for tools. Refuse the request rather than loop forever.
raise ToolLoopLimitExceeded(
f"Tool-use loop did not terminate within {settings.max_tool_use_iterations} iterations"
)
def run_turn(session_id: str, user_message: str) -> str:
"""Run one user turn end-to-end and return the assistant's reply text.
Wires together: session lookup with locking, history append, system
content with reminders, the tool-use loop, output validation, and the
safe-fallback path on validation failure.
"""
if not isinstance(session_id, str) or not session_id:
raise ValueError("session_id is required")
if not isinstance(user_message, str) or not user_message.strip():
raise ValueError("user_message is required")
session_lock = SESSIONS.lock_for(session_id)
with session_lock:
session = SESSIONS.get_or_create(session_id)
# Bounded conversations. Past the limit we refuse rather than let
# history grow unbounded, which keeps memory usage predictable and
# avoids the model losing coherence late in a chat.
if session.turn_count >= settings.max_turns_per_session:
return CONVERSATION_TOO_LONG_MESSAGE
session.history.append({"role": "user", "content": user_message})
system_content = build_system_content(session.turn_count)
client = _get_client()
try:
response, tool_results_this_turn = _run_tool_use_loop(session, system_content, client)
except ToolLoopLimitExceeded:
logger.warning(
"tool_loop_exceeded session=%s turn=%s",
_hash_for_logging(session_id),
session.turn_count,
)
session.turn_count += 1
return TOOL_LOOP_EXCEEDED_MESSAGE
reply_text = _extract_text(response.content)
validation = validate_reply(reply_text, tool_results_this_turn)
if not validation.ok:
# Redact PII: log only violation codes plus hashes of session and
# reply. Never log the reply body -- it may contain customer name,
# email, order ID, or tracking number.
logger.warning(
"validation_failed session=%s turn=%s violations=%s reply_sha=%s",
_hash_for_logging(session_id),
session.turn_count,
list(validation.violations),
_hash_for_logging(reply_text),
)
# Do NOT append the bad reply to history -- that would poison
# future turns.
session.turn_count += 1
return SAFE_FALLBACK
session.history.append(
{"role": "assistant", "content": _serialize_assistant_content(response.content)}
)
session.turn_count += 1
return reply_text