Security and performance fixes addressing a comprehensive review: - Server-issued HMAC-signed session cookies; client-supplied session_id ignored. Prevents session hijacking via body substitution. - Sliding-window rate limiter per IP and per session. - SessionStore with LRU eviction, idle TTL, per-session threading locks, and a hard turn cap. Bounds memory and serializes concurrent turns for the same session so FastAPI's threadpool cannot corrupt history. - Tool-use loop capped at settings.max_tool_use_iterations; Anthropic client gets an explicit timeout. No more infinite-loop credit burn. - Every tool argument is regex-validated, length-capped, and control-character-stripped. asserts replaced with ValueError so -O cannot silently disable the checks. - PII-safe warning logs: session IDs and reply bodies are hashed, never logged in clear. - hmac.compare_digest for email comparison (constant-time). - Strict Content-Security-Policy plus X-Content-Type-Options, X-Frame-Options, Referrer-Policy, Permissions-Policy via middleware. - Explicit handlers for anthropic.RateLimitError, APIConnectionError, APIStatusError, ValueError; static dir resolved from __file__. - Prompt cache breakpoints on the last tool schema and the last message so per-turn input cost scales linearly, not quadratically. - TypedDict handler argument shapes; direct block.name/block.id access. - functools.lru_cache on _get_client. - Anchored word-boundary regexes for out-of-scope detection to kill false positives on phrases like "I'd recommend contacting...". Literate program: - Bookly.lit.md is now the single source of truth for the five core Python files. Tangles byte-for-byte; verified via tangle.ts --verify. - Prose walkthrough, three mermaid diagrams, narrative per module. - Woven to static/architecture.html with the app's palette (background #f5f3ee) via scripts/architecture-header.html. - New GET /architecture route serves the HTML with a relaxed CSP that allows pandoc's inline styles. Available at bookly.codyborders.com/architecture. - scripts/rebuild_architecture_html.sh regenerates the HTML after edits. - code_reviews/2026-04-15-1433-code-review.md captures the review that drove these changes. All 37 tests pass.
254 lines
7.8 KiB
Python
254 lines
7.8 KiB
Python
"""Tool-handler tests covering Layer 3 enforcement and the privacy boundary.
|
|
|
|
Goal: verify that the tools, on their own, refuse the unsafe operations even
|
|
if the model ignores every system-prompt rule. The model never appears in
|
|
these tests — only the deterministic handlers and the per-session guard state.
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from mock_data import POLICIES, RETURNS
|
|
from tools import SessionGuardState, dispatch_tool
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_returns():
|
|
RETURNS.clear()
|
|
yield
|
|
RETURNS.clear()
|
|
|
|
|
|
@pytest.fixture
|
|
def state() -> SessionGuardState:
|
|
return SessionGuardState()
|
|
|
|
|
|
def test_lookup_order_returns_order_for_known_id(state):
|
|
result = dispatch_tool("lookup_order", {"order_id": "BK-10042"}, state)
|
|
assert "order" in result
|
|
assert result["order"]["customer_name"] == "Sarah Chen"
|
|
|
|
|
|
def test_lookup_order_unknown_id_returns_not_found(state):
|
|
result = dispatch_tool("lookup_order", {"order_id": "BK-99999"}, state)
|
|
assert result.get("error") == "order_not_found"
|
|
|
|
|
|
def test_lookup_order_email_mismatch_masquerades_as_not_found(state):
|
|
"""Privacy: a wrong email must look identical to a missing order so
|
|
callers cannot enumerate which IDs exist on the system."""
|
|
result = dispatch_tool(
|
|
"lookup_order",
|
|
{"order_id": "BK-10042", "customer_email": "wrong@example.com"},
|
|
state,
|
|
)
|
|
assert result.get("error") == "order_not_found"
|
|
|
|
|
|
def test_lookup_order_email_match_returns_order(state):
|
|
result = dispatch_tool(
|
|
"lookup_order",
|
|
{"order_id": "BK-10042", "customer_email": "Sarah.Chen@example.com"},
|
|
state,
|
|
)
|
|
assert "order" in result
|
|
|
|
|
|
def test_eligibility_check_passes_for_recent_delivered_order(state):
|
|
result = dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
|
|
state,
|
|
)
|
|
assert result["eligible"] is True
|
|
assert "BK-10042" in state.eligibility_checks_passed
|
|
|
|
|
|
def test_eligibility_check_rejects_past_window(state):
|
|
result = dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-9871", "customer_email": "maria.gonzalez@example.com"},
|
|
state,
|
|
)
|
|
assert result["eligible"] is False
|
|
assert "BK-9871" not in state.eligibility_checks_passed
|
|
assert "30-day" in result["reason"]
|
|
|
|
|
|
def test_eligibility_check_rejects_not_yet_delivered(state):
|
|
result = dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10089", "customer_email": "james.murphy@example.com"},
|
|
state,
|
|
)
|
|
assert result["eligible"] is False
|
|
assert "shipped" in result["reason"]
|
|
|
|
|
|
def test_eligibility_check_email_mismatch_returns_auth_failed(state):
|
|
result = dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10042", "customer_email": "wrong@example.com"},
|
|
state,
|
|
)
|
|
assert result.get("error") == "auth_failed"
|
|
|
|
|
|
def test_initiate_return_refuses_without_prior_eligibility_check(state):
|
|
"""Layer 3 protocol guard: the most important guardrail in the system."""
|
|
result = dispatch_tool(
|
|
"initiate_return",
|
|
{
|
|
"order_id": "BK-10042",
|
|
"customer_email": "sarah.chen@example.com",
|
|
"reason": "Bought by mistake",
|
|
},
|
|
state,
|
|
)
|
|
assert result.get("error") == "eligibility_not_verified"
|
|
assert not RETURNS
|
|
|
|
|
|
def test_initiate_return_succeeds_after_eligibility_check(state):
|
|
dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
|
|
state,
|
|
)
|
|
result = dispatch_tool(
|
|
"initiate_return",
|
|
{
|
|
"order_id": "BK-10042",
|
|
"customer_email": "sarah.chen@example.com",
|
|
"reason": "Bought by mistake",
|
|
},
|
|
state,
|
|
)
|
|
assert "return_id" in result
|
|
assert result["return_id"].startswith("RMA-")
|
|
assert "BK-10042" in state.returns_initiated
|
|
assert result["return_id"] in RETURNS
|
|
|
|
|
|
def test_initiate_return_refuses_duplicate(state):
|
|
dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
|
|
state,
|
|
)
|
|
dispatch_tool(
|
|
"initiate_return",
|
|
{
|
|
"order_id": "BK-10042",
|
|
"customer_email": "sarah.chen@example.com",
|
|
"reason": "Bought by mistake",
|
|
},
|
|
state,
|
|
)
|
|
second = dispatch_tool(
|
|
"initiate_return",
|
|
{
|
|
"order_id": "BK-10042",
|
|
"customer_email": "sarah.chen@example.com",
|
|
"reason": "Bought by mistake",
|
|
},
|
|
state,
|
|
)
|
|
assert second.get("error") == "already_initiated"
|
|
|
|
|
|
def test_lookup_policy_returns_verbatim_text(state):
|
|
result = dispatch_tool("lookup_policy", {"topic": "password_reset"}, state)
|
|
assert result["text"] == POLICIES["password_reset"]
|
|
|
|
|
|
def test_lookup_policy_unknown_topic_returns_not_supported(state):
|
|
result = dispatch_tool("lookup_policy", {"topic": "loyalty_program"}, state)
|
|
assert result.get("error") == "topic_not_supported"
|
|
assert "shipping" in result["available_topics"]
|
|
|
|
|
|
def test_lookup_policy_topic_is_case_insensitive(state):
|
|
result = dispatch_tool("lookup_policy", {"topic": "SHIPPING"}, state)
|
|
assert result["text"] == POLICIES["shipping"]
|
|
|
|
|
|
def test_dispatch_unknown_tool_returns_error(state):
|
|
result = dispatch_tool("delete_account", {}, state)
|
|
assert result.get("error") == "unknown_tool"
|
|
|
|
|
|
def test_dispatch_rejects_non_dict_arguments(state):
|
|
result = dispatch_tool("lookup_order", "BK-10042", state) # type: ignore[arg-type]
|
|
assert result.get("error") == "invalid_arguments"
|
|
|
|
|
|
def test_lookup_order_rejects_malformed_order_id(state):
|
|
result = dispatch_tool("lookup_order", {"order_id": "not-a-real-id"}, state)
|
|
assert result.get("error") == "invalid_arguments"
|
|
assert "order_id" in result["message"]
|
|
|
|
|
|
def test_lookup_order_strips_control_characters(state):
|
|
"""Control chars in free-form input must never reach tool storage."""
|
|
result = dispatch_tool(
|
|
"lookup_order",
|
|
{"order_id": "BK-10042\x00\x07"},
|
|
state,
|
|
)
|
|
# After stripping control chars "BK-10042" matches the regex.
|
|
assert "order" in result
|
|
|
|
|
|
def test_check_return_eligibility_rejects_malformed_email(state):
|
|
result = dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10042", "customer_email": "not-an-email"},
|
|
state,
|
|
)
|
|
assert result.get("error") == "invalid_arguments"
|
|
|
|
|
|
def test_initiate_return_rejects_empty_item_titles_list(state):
|
|
dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
|
|
state,
|
|
)
|
|
result = dispatch_tool(
|
|
"initiate_return",
|
|
{
|
|
"order_id": "BK-10042",
|
|
"customer_email": "sarah.chen@example.com",
|
|
"reason": "Bought by mistake",
|
|
"item_titles": [],
|
|
},
|
|
state,
|
|
)
|
|
assert result.get("error") == "no_items_selected"
|
|
|
|
|
|
def test_initiate_return_rejects_overlong_reason(state):
|
|
dispatch_tool(
|
|
"check_return_eligibility",
|
|
{"order_id": "BK-10042", "customer_email": "sarah.chen@example.com"},
|
|
state,
|
|
)
|
|
result = dispatch_tool(
|
|
"initiate_return",
|
|
{
|
|
"order_id": "BK-10042",
|
|
"customer_email": "sarah.chen@example.com",
|
|
"reason": "x" * 5000,
|
|
},
|
|
state,
|
|
)
|
|
assert result.get("error") == "invalid_arguments"
|
|
|
|
|
|
def test_lookup_policy_rejects_uppercase_and_punctuation(state):
|
|
"""Topic must normalize to lowercase underscores; anything else is a
|
|
validation error so nothing unexpected makes it into tool result JSON."""
|
|
result = dispatch_tool("lookup_policy", {"topic": "shipping!"}, state)
|
|
assert result.get("error") == "invalid_arguments"
|