""" Pertinent Action Auditing — AgenticState Interception Pattern ============================================================= Developed as a verification pass on the Pertinent action auditing framework with Gemini. This module demonstrates the core architecture of action auditing at the execution boundary. The distinction that matters: Output auditing asks: did the AI say something wrong? (Reactive — the Ends) Action auditing asks: did the AI do something unauthorized? (Proactive — the Means) This pattern intercepts agent state BEFORE tool execution — not after. The forensic log records every decision, authorized or blocked. We spent two years worrying about whether AI was wrong. The next five years are about whether AI was allowed. — pertinent.com | Chris McClean """ import uuid from typing import Dict, Any, Optional from datetime import datetime class AgenticState: """ Represents the full state of an agent action request. Every field is populated before any tool executes. The audit_status and forensic_log fields provide the complete record of what was attempted and what was decided. """ def __init__(self, agent_name: str, requested_tool: str, payload: Dict[str, Any]): self.transaction_id: str = str(uuid.uuid4()) self.timestamp: str = datetime.now().isoformat() self.agent_name: str = agent_name self.requested_tool: str = requested_tool self.payload: Dict[str, Any] = payload self.audit_status: str = "PENDING_REVIEW" self.execution_allowed: bool = False self.forensic_log: Optional[str] = None def summary(self) -> str: return ( f"\n{'=' * 60}\n" f"PERTINENT ACTION AUDIT TRAIL\n" f"{'=' * 60}\n" f"Transaction ID : {self.transaction_id}\n" f"Timestamp : {self.timestamp}\n" f"Agent : {self.agent_name}\n" f"Requested Tool : {self.requested_tool}\n" f"Audit Status : {self.audit_status}\n" f"Execution Allowed : {self.execution_allowed}\n" f"Forensic Entry : {self.forensic_log}\n" f"{'=' * 60}" ) class PertinentActionAuditor: """ Intercepts and evaluates agent state prior to tool execution. The authorization_registry defines the explicit permission table — who is authorized to do what, under what conditions, and whether a human verification token is required before execution proceeds. The permission table is the object of governance. Who built it, when was it last reviewed, whose assumptions does it encode? """ def __init__(self): self.authorization_registry = { "CRM_Read": { "max_records": 100, "requires_approval": False, "description": "Read-only CRM access — low risk, no human token required" }, "CRM_Write": { "max_records": 1, "requires_approval": True, "description": "Single-record CRM write — human token required" }, "Database_Write": { "max_records": 1, "requires_approval": True, "description": "Database write operations — human token required, scope-limited" }, "External_API_Post": { "max_records": 0, "requires_approval": True, "description": "External API calls — always requires human authorization" }, "Email_Send": { "max_records": 1, "requires_approval": True, "description": "Outbound email — single recipient only, human token required" }, } def evaluate_action_state(self, state: AgenticState) -> AgenticState: """ Intercepts and evaluates the agent state prior to tool execution. This is the action auditing layer. It sits between the orchestration node and the production boundary. Nothing executes without passing through here first. """ print(f"\n[PERTINENT AUDIT] Intercepting transaction {state.transaction_id}") print(f"[PERTINENT AUDIT] Agent: '{state.agent_name}' requesting tool: '{state.requested_tool}'") tool = state.requested_tool rule = self.authorization_registry.get(tool) # Tool not in permission table — block immediately if not rule: state.audit_status = "REJECTED_UNKNOWN_TOOL" state.forensic_log = ( f"Action blocked: Tool '{tool}' is not registered within the " f"authorization registry. No execution permitted for unregistered tools." ) return state # Check scope limits records_affected = state.payload.get("records_count", 0) if records_affected > rule["max_records"] and rule["max_records"] > 0: state.audit_status = "BLOCKED_EXCESSIVE_SCOPE" state.forensic_log = ( f"Security violation: Agent requested to affect {records_affected} records. " f"Maximum authorized scope for autonomous execution of '{tool}' is " f"{rule['max_records']} record(s). No execution permitted." ) return state # Check for bulk operation on zero-record tools (e.g. External_API_Post) if rule["max_records"] == 0 and records_affected > 0: state.audit_status = "BLOCKED_TOOL_NOT_FOR_BULK" state.forensic_log = ( f"Security violation: Tool '{tool}' is not authorized for record-based " f"operations. Attempted scope: {records_affected} records. Blocked." ) return state # Check for human verification token on approval-required tools if rule["requires_approval"]: if not state.payload.get("human_verification_token"): state.audit_status = "HALTED_AWAITING_HUMAN_TOKEN" state.forensic_log = ( f"State change suspended: Tool '{tool}' requires human-in-the-loop " f"validation. No human verification token present in payload. " f"Routing to human review queue." ) return state # All checks passed — authorize execution state.audit_status = "VERIFIED_COMPLIANT" state.execution_allowed = True state.forensic_log = ( f"Action verified against authorization registry. Tool '{tool}' is registered, " f"scope is within limits, and authorization requirements are satisfied. " f"Safe for execution." ) return state # ============================================================================= # SIMULATION 1: Unauthorized bulk write # An agent attempts to modify 450 financial records without a human token. # This is the Robert Chen scenario at scale. # ============================================================================= print("\n" + "=" * 60) print("SIMULATION 1: Bulk Database Write — No Human Token") print("=" * 60) bulk_write_payload = { "target_table": "usr_financial_profiles", "records_count": 450, "operation": "BATCH_UPDATE_STATUS", "human_verification_token": None # No token present } state_1 = AgenticState( agent_name="OptimizationAgent_v2", requested_tool="Database_Write", payload=bulk_write_payload ) auditor = PertinentActionAuditor() result_1 = auditor.evaluate_action_state(state_1) print(result_1.summary()) # ============================================================================= # SIMULATION 2: Authorized single-record write with human token # The same agent, same tool, but within scope and with authorization. # ============================================================================= print("\n" + "=" * 60) print("SIMULATION 2: Single Database Write — Human Token Present") print("=" * 60) authorized_payload = { "target_table": "usr_financial_profiles", "records_count": 1, "operation": "UPDATE_STATUS", "record_id": "usr_00421", "human_verification_token": "HVT-2026-05-31-APPROVED" } state_2 = AgenticState( agent_name="OptimizationAgent_v2", requested_tool="Database_Write", payload=authorized_payload ) result_2 = auditor.evaluate_action_state(state_2) print(result_2.summary()) # ============================================================================= # SIMULATION 3: Unregistered tool attempt # An agent tries to call a tool not in the permission table. # ============================================================================= print("\n" + "=" * 60) print("SIMULATION 3: Unregistered Tool Attempt") print("=" * 60) unknown_payload = { "endpoint": "/internal/admin/delete_user", "records_count": 1, } state_3 = AgenticState( agent_name="CleanupAgent_v1", requested_tool="Admin_Delete", payload=unknown_payload ) result_3 = auditor.evaluate_action_state(state_3) print(result_3.summary()) # ============================================================================= # SIMULATION 4: Read operation — no token required # Low-risk, within scope, proceeds without human intervention. # ============================================================================= print("\n" + "=" * 60) print("SIMULATION 4: CRM Read — Authorized, No Token Required") print("=" * 60) read_payload = { "query": "SELECT * FROM leads WHERE status = 'warm'", "records_count": 45, } state_4 = AgenticState( agent_name="LeadScoringAgent_v3", requested_tool="CRM_Read", payload=read_payload ) result_4 = auditor.evaluate_action_state(state_4) print(result_4.summary()) # ============================================================================= # AUDIT SUMMARY # ============================================================================= print("\n" + "=" * 60) print("AUDIT SUMMARY — ALL TRANSACTIONS") print("=" * 60) for i, result in enumerate([result_1, result_2, result_3, result_4], 1): allowed = "ALLOWED" if result.execution_allowed else "BLOCKED" print(f" [{i}] {result.agent_name:<30} {result.requested_tool:<25} {allowed} — {result.audit_status}") print("=" * 60) print("\nAction auditing is proactive. The Means is important.") print("Output auditing is reactive. The Ends is important.") print("pertinent.com")