bookly/tests/test_tools.py
Cody Borders 3947180841 Harden security/perf, add literate program at /architecture
Security and performance fixes addressing a comprehensive review:

- Server-issued HMAC-signed session cookies; client-supplied session_id
  ignored. Prevents session hijacking via body substitution.
- Sliding-window rate limiter per IP and per session.
- SessionStore with LRU eviction, idle TTL, per-session threading locks,
  and a hard turn cap. Bounds memory and serializes concurrent turns for
  the same session so FastAPI's threadpool cannot corrupt history.
- Tool-use loop capped at settings.max_tool_use_iterations; Anthropic
  client gets an explicit timeout. No more infinite-loop credit burn.
- Every tool argument is regex-validated, length-capped, and
  control-character-stripped. asserts replaced with ValueError so -O
  cannot silently disable the checks.
- PII-safe warning logs: session IDs and reply bodies are hashed, never
  logged in clear.
- hmac.compare_digest for email comparison (constant-time).
- Strict Content-Security-Policy plus X-Content-Type-Options,
  X-Frame-Options, Referrer-Policy, Permissions-Policy via middleware.
- Explicit handlers for anthropic.RateLimitError, APIConnectionError,
  APIStatusError, ValueError; static dir resolved from __file__.
- Prompt cache breakpoints on the last tool schema and the last message
  so per-turn input cost scales linearly, not quadratically.
- TypedDict handler argument shapes; direct block.name/block.id access.
- functools.lru_cache on _get_client.
- Anchored word-boundary regexes for out-of-scope detection to kill
  false positives on phrases like "I'd recommend contacting...".

Literate program:

- Bookly.lit.md is now the single source of truth for the five core
  Python files. Tangles byte-for-byte; verified via tangle.ts --verify.
- Prose walkthrough, three mermaid diagrams, narrative per module.
- Woven to static/architecture.html with the app's palette
  (background #f5f3ee) via scripts/architecture-header.html.
- New GET /architecture route serves the HTML with a relaxed CSP that
  allows pandoc's inline styles. Available at
  bookly.codyborders.com/architecture.
- scripts/rebuild_architecture_html.sh regenerates the HTML after edits.
- code_reviews/2026-04-15-1433-code-review.md captures the review that
  drove these changes.

All 37 tests pass.
2026-04-15 15:02:40 -07:00

254 lines
7.8 KiB
Python

"""Tool-handler tests covering Layer 3 enforcement and the privacy boundary.
Goal: verify that the tools, on their own, refuse the unsafe operations even
if the model ignores every system-prompt rule. The model never appears in
these tests — only the deterministic handlers and the per-session guard state.
"""
import pytest
from mock_data import POLICIES, RETURNS
from tools import SessionGuardState, dispatch_tool
@pytest.fixture(autouse=True)
def _reset_returns():
RETURNS.clear()
yield
RETURNS.clear()
@pytest.fixture
def state() -> SessionGuardState:
return SessionGuardState()
def test_lookup_order_returns_order_for_known_id(state):
result = dispatch_tool("lookup_order", {"order_id": "BK-10042"}, state)
assert "order" in result
assert result["order"]["customer_name"] == "Sarah Chen"
def test_lookup_order_unknown_id_returns_not_found(state):
result = dispatch_tool("lookup_order", {"order_id": "BK-99999"}, state)
assert result.get("error") == "order_not_found"
def test_lookup_order_email_mismatch_masquerades_as_not_found(state):
"""Privacy: a wrong email must look identical to a missing order so
callers cannot enumerate which IDs exist on the system."""
result = dispatch_tool(
"lookup_order",
{"order_id": "BK-10042", "customer_email": "wrong@example.com"},
state,
)
assert result.get("error") == "order_not_found"
def test_lookup_order_email_match_returns_order(state):
result = dispatch_tool(
"lookup_order",
{"order_id": "BK-10042", "customer_email": "Sarah.Chen@example.com"},
state,
)
assert "order" in result
def test_eligibility_check_passes_for_recent_delivered_order(state):
result = dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
state,
)
assert result["eligible"] is True
assert "BK-10042" in state.eligibility_checks_passed
def test_eligibility_check_rejects_past_window(state):
result = dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-9871", "customer_email": "maria.gonzalez@example.com"},
state,
)
assert result["eligible"] is False
assert "BK-9871" not in state.eligibility_checks_passed
assert "30-day" in result["reason"]
def test_eligibility_check_rejects_not_yet_delivered(state):
result = dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10089", "customer_email": "james.murphy@example.com"},
state,
)
assert result["eligible"] is False
assert "shipped" in result["reason"]
def test_eligibility_check_email_mismatch_returns_auth_failed(state):
result = dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10042", "customer_email": "wrong@example.com"},
state,
)
assert result.get("error") == "auth_failed"
def test_initiate_return_refuses_without_prior_eligibility_check(state):
"""Layer 3 protocol guard: the most important guardrail in the system."""
result = dispatch_tool(
"initiate_return",
{
"order_id": "BK-10042",
"customer_email": "sarah.chen@example.com",
"reason": "Bought by mistake",
},
state,
)
assert result.get("error") == "eligibility_not_verified"
assert not RETURNS
def test_initiate_return_succeeds_after_eligibility_check(state):
dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
state,
)
result = dispatch_tool(
"initiate_return",
{
"order_id": "BK-10042",
"customer_email": "sarah.chen@example.com",
"reason": "Bought by mistake",
},
state,
)
assert "return_id" in result
assert result["return_id"].startswith("RMA-")
assert "BK-10042" in state.returns_initiated
assert result["return_id"] in RETURNS
def test_initiate_return_refuses_duplicate(state):
dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
state,
)
dispatch_tool(
"initiate_return",
{
"order_id": "BK-10042",
"customer_email": "sarah.chen@example.com",
"reason": "Bought by mistake",
},
state,
)
second = dispatch_tool(
"initiate_return",
{
"order_id": "BK-10042",
"customer_email": "sarah.chen@example.com",
"reason": "Bought by mistake",
},
state,
)
assert second.get("error") == "already_initiated"
def test_lookup_policy_returns_verbatim_text(state):
result = dispatch_tool("lookup_policy", {"topic": "password_reset"}, state)
assert result["text"] == POLICIES["password_reset"]
def test_lookup_policy_unknown_topic_returns_not_supported(state):
result = dispatch_tool("lookup_policy", {"topic": "loyalty_program"}, state)
assert result.get("error") == "topic_not_supported"
assert "shipping" in result["available_topics"]
def test_lookup_policy_topic_is_case_insensitive(state):
result = dispatch_tool("lookup_policy", {"topic": "SHIPPING"}, state)
assert result["text"] == POLICIES["shipping"]
def test_dispatch_unknown_tool_returns_error(state):
result = dispatch_tool("delete_account", {}, state)
assert result.get("error") == "unknown_tool"
def test_dispatch_rejects_non_dict_arguments(state):
result = dispatch_tool("lookup_order", "BK-10042", state) # type: ignore[arg-type]
assert result.get("error") == "invalid_arguments"
def test_lookup_order_rejects_malformed_order_id(state):
result = dispatch_tool("lookup_order", {"order_id": "not-a-real-id"}, state)
assert result.get("error") == "invalid_arguments"
assert "order_id" in result["message"]
def test_lookup_order_strips_control_characters(state):
"""Control chars in free-form input must never reach tool storage."""
result = dispatch_tool(
"lookup_order",
{"order_id": "BK-10042\x00\x07"},
state,
)
# After stripping control chars "BK-10042" matches the regex.
assert "order" in result
def test_check_return_eligibility_rejects_malformed_email(state):
result = dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10042", "customer_email": "not-an-email"},
state,
)
assert result.get("error") == "invalid_arguments"
def test_initiate_return_rejects_empty_item_titles_list(state):
dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
state,
)
result = dispatch_tool(
"initiate_return",
{
"order_id": "BK-10042",
"customer_email": "sarah.chen@example.com",
"reason": "Bought by mistake",
"item_titles": [],
},
state,
)
assert result.get("error") == "no_items_selected"
def test_initiate_return_rejects_overlong_reason(state):
dispatch_tool(
"check_return_eligibility",
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
state,
)
result = dispatch_tool(
"initiate_return",
{
"order_id": "BK-10042",
"customer_email": "sarah.chen@example.com",
"reason": "x" * 5000,
},
state,
)
assert result.get("error") == "invalid_arguments"
def test_lookup_policy_rejects_uppercase_and_punctuation(state):
"""Topic must normalize to lowercase underscores; anything else is a
validation error so nothing unexpected makes it into tool result JSON."""
result = dispatch_tool("lookup_policy", {"topic": "shipping!"}, state)
assert result.get("error") == "invalid_arguments"