✅ TICKET-006: Wake-word Detection Service - Implemented wake-word detection using openWakeWord - HTTP/WebSocket server on port 8002 - Real-time detection with configurable threshold - Event emission for ASR integration - Location: home-voice-agent/wake-word/ ✅ TICKET-010: ASR Service - Implemented ASR using faster-whisper - HTTP endpoint for file transcription - WebSocket endpoint for streaming transcription - Support for multiple audio formats - Auto language detection - GPU acceleration support - Location: home-voice-agent/asr/ ✅ TICKET-014: TTS Service - Implemented TTS using Piper - HTTP endpoint for text-to-speech synthesis - Low-latency processing (< 500ms) - Multiple voice support - WAV audio output - Location: home-voice-agent/tts/ ✅ TICKET-047: Updated Hardware Purchases - Marked Pi5 kit, SSD, microphone, and speakers as purchased - Updated progress log with purchase status 📚 Documentation: - Added VOICE_SERVICES_README.md with complete testing guide - Each service includes README.md with usage instructions - All services ready for Pi5 deployment 🧪 Testing: - Created test files for each service - All imports validated - FastAPI apps created successfully - Code passes syntax validation 🚀 Ready for: - Pi5 deployment - End-to-end voice flow testing - Integration with MCP server Files Added: - wake-word/detector.py - wake-word/server.py - wake-word/requirements.txt - wake-word/README.md - wake-word/test_detector.py - asr/service.py - asr/server.py - asr/requirements.txt - asr/README.md - asr/test_service.py - tts/service.py - tts/server.py - tts/requirements.txt - tts/README.md - tts/test_service.py - VOICE_SERVICES_README.md Files Modified: - tickets/done/TICKET-047_hardware-purchases.md Files Moved: - tickets/backlog/TICKET-006_prototype-wake-word-node.md → tickets/done/ - tickets/backlog/TICKET-010_streaming-asr-service.md → tickets/done/ - tickets/backlog/TICKET-014_tts-service.md → tickets/done/
163 lines
5.2 KiB
Python
163 lines
5.2 KiB
Python
"""
|
|
Confirmation token system.
|
|
|
|
Generates and validates signed tokens for confirmed actions.
|
|
"""
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Dict, Any
|
|
from pathlib import Path
|
|
import secrets
|
|
|
|
|
|
class ConfirmationToken:
|
|
"""Confirmation token for high-risk actions."""
|
|
|
|
def __init__(self, secret_key: Optional[str] = None):
|
|
"""
|
|
Initialize token system.
|
|
|
|
Args:
|
|
secret_key: Secret key for signing tokens. If None, generates one.
|
|
"""
|
|
if secret_key is None:
|
|
# Load or generate secret key
|
|
key_file = Path(__file__).parent.parent.parent / "data" / ".confirmation_secret"
|
|
if key_file.exists():
|
|
self.secret_key = key_file.read_text().strip()
|
|
else:
|
|
self.secret_key = secrets.token_urlsafe(32)
|
|
key_file.parent.mkdir(parents=True, exist_ok=True)
|
|
key_file.write_text(self.secret_key)
|
|
else:
|
|
self.secret_key = secret_key
|
|
|
|
def generate_token(self,
|
|
tool_name: str,
|
|
parameters: Dict[str, Any],
|
|
session_id: Optional[str] = None,
|
|
user_id: Optional[str] = None,
|
|
expires_in: int = 300) -> str:
|
|
"""
|
|
Generate a signed confirmation token.
|
|
|
|
Args:
|
|
tool_name: Name of the tool
|
|
parameters: Tool parameters
|
|
session_id: Session ID
|
|
user_id: User ID
|
|
expires_in: Token expiration in seconds (default 5 minutes)
|
|
|
|
Returns:
|
|
Signed token string
|
|
"""
|
|
payload = {
|
|
"tool_name": tool_name,
|
|
"parameters": parameters,
|
|
"session_id": session_id,
|
|
"user_id": user_id,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"expires_at": (datetime.now() + timedelta(seconds=expires_in)).isoformat(),
|
|
}
|
|
|
|
# Create signature
|
|
payload_str = json.dumps(payload, sort_keys=True)
|
|
signature = hmac.new(
|
|
self.secret_key.encode(),
|
|
payload_str.encode(),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
# Combine payload and signature
|
|
token_data = {
|
|
"payload": payload,
|
|
"signature": signature
|
|
}
|
|
|
|
# Encode as base64-like string (simplified)
|
|
import base64
|
|
token_str = base64.urlsafe_b64encode(
|
|
json.dumps(token_data).encode()
|
|
).decode()
|
|
|
|
return token_str
|
|
|
|
def validate_token(self, token: str) -> tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
|
|
"""
|
|
Validate a confirmation token.
|
|
|
|
Args:
|
|
token: Token string to validate
|
|
|
|
Returns:
|
|
(is_valid, payload, error_message)
|
|
"""
|
|
try:
|
|
import base64
|
|
# Decode token
|
|
token_data = json.loads(base64.urlsafe_b64decode(token.encode()).decode())
|
|
payload = token_data["payload"]
|
|
signature = token_data["signature"]
|
|
|
|
# Verify signature
|
|
payload_str = json.dumps(payload, sort_keys=True)
|
|
expected_signature = hmac.new(
|
|
self.secret_key.encode(),
|
|
payload_str.encode(),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
if not hmac.compare_digest(signature, expected_signature):
|
|
return False, None, "Invalid token signature"
|
|
|
|
# Check expiration
|
|
expires_at = datetime.fromisoformat(payload["expires_at"])
|
|
if datetime.now() > expires_at:
|
|
return False, None, "Token expired"
|
|
|
|
return True, payload, None
|
|
|
|
except Exception as e:
|
|
return False, None, f"Token validation error: {e}"
|
|
|
|
def verify_action(self, token: str, tool_name: str, parameters: Dict[str, Any]) -> tuple[bool, Optional[str]]:
|
|
"""
|
|
Verify that token matches the intended action.
|
|
|
|
Args:
|
|
token: Confirmation token
|
|
tool_name: Expected tool name
|
|
parameters: Expected parameters
|
|
|
|
Returns:
|
|
(is_valid, error_message)
|
|
"""
|
|
is_valid, payload, error = self.validate_token(token)
|
|
if not is_valid:
|
|
return False, error
|
|
|
|
# Check tool name matches
|
|
if payload["tool_name"] != tool_name:
|
|
return False, f"Token tool name mismatch: expected {tool_name}, got {payload['tool_name']}"
|
|
|
|
# Check parameters match (simplified - could be more sophisticated)
|
|
token_params = payload["parameters"]
|
|
for key, value in parameters.items():
|
|
if key not in token_params or token_params[key] != value:
|
|
return False, f"Token parameter mismatch for {key}"
|
|
|
|
return True, None
|
|
|
|
|
|
# Global token instance
|
|
_token = ConfirmationToken()
|
|
|
|
|
|
def get_token_system() -> ConfirmationToken:
|
|
"""Get the global token system instance."""
|
|
return _token
|