- Add .gitignore to prevent secret leakage - Implement YOLO guard with double-confirmation flow * Environment variable gate (YOLO_MODE=1) * Typed confirmation phrase + one-time code * Time-limited approval tokens (5 min TTL) * Single-use tokens with audit logging - Add rate limiting (10/min, 100/hour, 500/day) * Token bucket implementation * Per-session tracking * Automatic bucket reset - Integrate safeguards into existing code * Rate limiter in SecureBridge.send_message() * YOLO guard in YOLOMode.execute_command() * Dry-run mode by default - Add security test suite * .gitignore validation * YOLO guard functional tests * Rate limiter verification * Integration checks All security measures tested and verified. No secrets found in git history.
362 lines
10 KiB
Python
362 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
YOLO Mode Guard - Multi-stage confirmation system
|
|
|
|
Prevents accidental/unauthorized command execution by requiring:
|
|
1. Environment variable flag (YOLO_MODE=1)
|
|
2. Typed confirmation phrase
|
|
3. One-time random code
|
|
4. Time-limited approval token for actual execution
|
|
|
|
Author: Danny Stocker
|
|
License: MIT
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import secrets
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
|
|
class YOLOGuard:
|
|
"""Multi-stage confirmation system for dangerous operations"""
|
|
|
|
TOKEN_FILE = Path.home() / ".yolo_tokens.json"
|
|
AUDIT_LOG = Path.home() / "yolo_audit.log"
|
|
|
|
@classmethod
|
|
def require_confirmation(cls) -> bool:
|
|
"""
|
|
Stage 1: Manual confirmation with typed phrases
|
|
|
|
Returns:
|
|
True if user completes confirmation flow
|
|
"""
|
|
# Check environment variable
|
|
if os.getenv("YOLO_MODE") != "1":
|
|
print("❌ YOLO mode is disabled.")
|
|
print(" Set YOLO_MODE=1 to enable.")
|
|
return False
|
|
|
|
# Display warning
|
|
print("\n" + "="*70)
|
|
print("⚠️ WARNING: YOLO MODE ENABLES COMMAND EXECUTION")
|
|
print("="*70)
|
|
print("\nThis allows AI agents to run commands on your system.")
|
|
print("Commands will have access to your files and permissions.")
|
|
print("\nOnly proceed if:")
|
|
print(" • You understand the security implications")
|
|
print(" • You are in an isolated/sandboxed environment")
|
|
print(" • You have backups of important data")
|
|
print(" • You will supervise all operations")
|
|
print()
|
|
|
|
# Require exact confirmation phrase
|
|
required_phrase = "I UNDERSTAND THE RISKS"
|
|
confirmation = input(f"Type '{required_phrase}' to continue: ").strip()
|
|
|
|
if confirmation != required_phrase:
|
|
print("❌ Confirmation phrase incorrect. Aborting.")
|
|
cls._log_audit("CONFIRMATION_FAILED", {
|
|
"reason": "incorrect_phrase",
|
|
"provided": confirmation[:20] + "..."
|
|
})
|
|
return False
|
|
|
|
# Generate and require one-time code
|
|
code = secrets.token_hex(3) # 6-character hex string
|
|
print(f"\nOne-time code: {code}")
|
|
user_code = input("Retype the code above: ").strip()
|
|
|
|
if user_code != code:
|
|
print("❌ Code mismatch. Aborting.")
|
|
cls._log_audit("CONFIRMATION_FAILED", {
|
|
"reason": "code_mismatch"
|
|
})
|
|
return False
|
|
|
|
# Success
|
|
cls._log_audit("YOLO_ENABLED", {
|
|
"method": "interactive_confirmation",
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
print("\n✅ YOLO mode enabled for this session")
|
|
print(" Use --generate-token to create execution tokens\n")
|
|
|
|
return True
|
|
|
|
@classmethod
|
|
def generate_approval_token(cls, ttl_seconds: int = 300) -> str:
|
|
"""
|
|
Stage 2: Generate time-limited execution token
|
|
|
|
Args:
|
|
ttl_seconds: Token lifetime in seconds (default: 5 minutes)
|
|
|
|
Returns:
|
|
URL-safe token string
|
|
"""
|
|
token = secrets.token_urlsafe(32)
|
|
expires_at = datetime.now() + timedelta(seconds=ttl_seconds)
|
|
|
|
# Load existing tokens
|
|
tokens = cls._load_tokens()
|
|
|
|
# Store new token
|
|
tokens[token] = {
|
|
"created_at": datetime.now().isoformat(),
|
|
"expires_at": expires_at.isoformat(),
|
|
"ttl_seconds": ttl_seconds,
|
|
"used": False
|
|
}
|
|
|
|
cls._save_tokens(tokens)
|
|
cls._log_audit("TOKEN_GENERATED", {
|
|
"token_preview": token[:10] + "...",
|
|
"ttl_seconds": ttl_seconds,
|
|
"expires_at": expires_at.isoformat()
|
|
})
|
|
|
|
print(f"\n✅ Approval token generated")
|
|
print(f" Token: {token}")
|
|
print(f" Valid for: {ttl_seconds} seconds ({ttl_seconds//60} minutes)")
|
|
print(f" Expires at: {expires_at.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"\nUse with:")
|
|
print(f" --execute --approval-token {token}")
|
|
print()
|
|
|
|
return token
|
|
|
|
@classmethod
|
|
def validate_approval_token(cls, token: str) -> bool:
|
|
"""
|
|
Stage 3: Validate and consume approval token
|
|
|
|
Args:
|
|
token: Token to validate
|
|
|
|
Returns:
|
|
True if token is valid, False otherwise
|
|
"""
|
|
tokens = cls._load_tokens()
|
|
|
|
# Check if token exists
|
|
if token not in tokens:
|
|
cls._log_audit("TOKEN_INVALID", {
|
|
"token_preview": token[:10] + "...",
|
|
"reason": "not_found"
|
|
})
|
|
return False
|
|
|
|
token_data = tokens[token]
|
|
|
|
# Check if already used
|
|
if token_data["used"]:
|
|
cls._log_audit("TOKEN_INVALID", {
|
|
"token_preview": token[:10] + "...",
|
|
"reason": "already_used",
|
|
"used_at": token_data.get("used_at", "unknown")
|
|
})
|
|
return False
|
|
|
|
# Check expiration
|
|
expires_at = datetime.fromisoformat(token_data["expires_at"])
|
|
if datetime.now() > expires_at:
|
|
cls._log_audit("TOKEN_INVALID", {
|
|
"token_preview": token[:10] + "...",
|
|
"reason": "expired",
|
|
"expired_at": token_data["expires_at"]
|
|
})
|
|
return False
|
|
|
|
# Mark as used
|
|
tokens[token]["used"] = True
|
|
tokens[token]["used_at"] = datetime.now().isoformat()
|
|
cls._save_tokens(tokens)
|
|
|
|
cls._log_audit("TOKEN_VALIDATED", {
|
|
"token_preview": token[:10] + "...",
|
|
"created_at": token_data["created_at"],
|
|
"used_at": tokens[token]["used_at"]
|
|
})
|
|
|
|
return True
|
|
|
|
@classmethod
|
|
def _load_tokens(cls) -> Dict:
|
|
"""Load tokens from file"""
|
|
if not cls.TOKEN_FILE.exists():
|
|
return {}
|
|
|
|
try:
|
|
return json.loads(cls.TOKEN_FILE.read_text())
|
|
except json.JSONDecodeError:
|
|
# Corrupted file, start fresh
|
|
return {}
|
|
|
|
@classmethod
|
|
def _save_tokens(cls, tokens: Dict):
|
|
"""Save tokens to file with restricted permissions"""
|
|
cls.TOKEN_FILE.write_text(json.dumps(tokens, indent=2))
|
|
cls.TOKEN_FILE.chmod(0o600) # Owner read/write only
|
|
|
|
@classmethod
|
|
def _log_audit(cls, action: str, details: Dict):
|
|
"""Append audit entry to log file"""
|
|
entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"action": action,
|
|
"details": details
|
|
}
|
|
|
|
# Ensure log directory exists
|
|
cls.AUDIT_LOG.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Append as JSON lines
|
|
with open(cls.AUDIT_LOG, "a") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
|
|
@classmethod
|
|
def cleanup_expired_tokens(cls) -> int:
|
|
"""Remove expired tokens from storage"""
|
|
tokens = cls._load_tokens()
|
|
now = datetime.now()
|
|
|
|
expired = []
|
|
for token, data in tokens.items():
|
|
expires_at = datetime.fromisoformat(data["expires_at"])
|
|
if now > expires_at:
|
|
expired.append(token)
|
|
|
|
for token in expired:
|
|
del tokens[token]
|
|
|
|
if expired:
|
|
cls._save_tokens(tokens)
|
|
cls._log_audit("TOKENS_CLEANED", {
|
|
"count": len(expired)
|
|
})
|
|
|
|
return len(expired)
|
|
|
|
@classmethod
|
|
def list_active_tokens(cls) -> list:
|
|
"""List all valid (non-expired, unused) tokens"""
|
|
tokens = cls._load_tokens()
|
|
now = datetime.now()
|
|
|
|
active = []
|
|
for token, data in tokens.items():
|
|
expires_at = datetime.fromisoformat(data["expires_at"])
|
|
if not data["used"] and now <= expires_at:
|
|
active.append({
|
|
"token_preview": token[:10] + "...",
|
|
"created_at": data["created_at"],
|
|
"expires_at": data["expires_at"],
|
|
"ttl_seconds": data["ttl_seconds"]
|
|
})
|
|
|
|
return active
|
|
|
|
|
|
def main():
|
|
"""CLI interface for YOLO guard"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="YOLO Mode Guard - Safe command execution gating",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Enable YOLO mode (requires confirmation)
|
|
export YOLO_MODE=1
|
|
python yolo_guard.py --enable-yolo
|
|
|
|
# Generate 5-minute token
|
|
python yolo_guard.py --generate-token --ttl 300
|
|
|
|
# List active tokens
|
|
python yolo_guard.py --list-tokens
|
|
|
|
# Clean up expired tokens
|
|
python yolo_guard.py --cleanup
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--enable-yolo",
|
|
action="store_true",
|
|
help="Enable YOLO mode with interactive confirmation"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--generate-token",
|
|
action="store_true",
|
|
help="Generate time-limited approval token"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--ttl",
|
|
type=int,
|
|
default=300,
|
|
help="Token TTL in seconds (default: 300 = 5 minutes)"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--list-tokens",
|
|
action="store_true",
|
|
help="List active (valid) tokens"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--cleanup",
|
|
action="store_true",
|
|
help="Remove expired tokens"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Enable YOLO mode
|
|
if args.enable_yolo:
|
|
success = YOLOGuard.require_confirmation()
|
|
sys.exit(0 if success else 1)
|
|
|
|
# Generate token
|
|
if args.generate_token:
|
|
if os.getenv("YOLO_MODE") != "1":
|
|
print("❌ YOLO_MODE not enabled. Set YOLO_MODE=1 first.")
|
|
sys.exit(1)
|
|
|
|
YOLOGuard.generate_approval_token(args.ttl)
|
|
sys.exit(0)
|
|
|
|
# List active tokens
|
|
if args.list_tokens:
|
|
tokens = YOLOGuard.list_active_tokens()
|
|
|
|
if not tokens:
|
|
print("No active tokens.")
|
|
else:
|
|
print(f"\nActive tokens: {len(tokens)}")
|
|
for token in tokens:
|
|
print(f"\n Token: {token['token_preview']}")
|
|
print(f" Created: {token['created_at']}")
|
|
print(f" Expires: {token['expires_at']}")
|
|
print(f" TTL: {token['ttl_seconds']}s")
|
|
|
|
sys.exit(0)
|
|
|
|
# Cleanup expired
|
|
if args.cleanup:
|
|
count = YOLOGuard.cleanup_expired_tokens()
|
|
print(f"✅ Removed {count} expired token(s)")
|
|
sys.exit(0)
|
|
|
|
# No arguments - show help
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|