#!/usr/bin/env python3 """ ASR HTTP/WebSocket server. Provides endpoints for speech-to-text transcription. """ import logging import asyncio import json import io from typing import List, Optional from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, UploadFile, File, Form from fastapi.responses import JSONResponse, PlainTextResponse from pydantic import BaseModel from .service import ASRService, get_service logger = logging.getLogger(__name__) app = FastAPI(title="ASR Service", version="0.1.0") # Global service asr_service: Optional[ASRService] = None @app.on_event("startup") async def startup(): """Initialize ASR service on startup.""" global asr_service try: asr_service = get_service() logger.info("ASR service initialized") except Exception as e: logger.error(f"Failed to initialize ASR service: {e}") asr_service = None @app.get("/health") async def health(): """Health check endpoint.""" return { "status": "healthy" if asr_service else "unavailable", "service": "asr", "model": asr_service.model_size if asr_service else None, "device": asr_service.device if asr_service else None } @app.post("/transcribe") async def transcribe( audio: UploadFile = File(...), language: Optional[str] = Form(None), format: str = Form("json") ): """ Transcribe audio file. Args: audio: Audio file (WAV, MP3, FLAC, etc.) language: Language code (optional, auto-detect if not provided) format: Response format ("text" or "json") """ if not asr_service: raise HTTPException(status_code=503, detail="ASR service unavailable") try: # Read audio file audio_bytes = await audio.read() # Transcribe result = asr_service.transcribe_file( audio_bytes, format=format, language=language ) if format == "text": return PlainTextResponse(result["text"]) return JSONResponse(result) except Exception as e: logger.error(f"Transcription error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.get("/languages") async def get_languages(): """Get supported languages.""" # Whisper supports many languages languages = [ {"code": "en", "name": "English"}, {"code": "es", "name": "Spanish"}, {"code": "fr", "name": "French"}, {"code": "de", "name": "German"}, {"code": "it", "name": "Italian"}, {"code": "pt", "name": "Portuguese"}, {"code": "ru", "name": "Russian"}, {"code": "ja", "name": "Japanese"}, {"code": "ko", "name": "Korean"}, {"code": "zh", "name": "Chinese"}, ] return {"languages": languages} @app.websocket("/stream") async def websocket_stream(websocket: WebSocket): """WebSocket endpoint for streaming transcription.""" if not asr_service: await websocket.close(code=1003, reason="ASR service unavailable") return await websocket.accept() logger.info("WebSocket client connected for streaming transcription") audio_chunks = [] try: while True: # Receive audio data or control message try: data = await asyncio.wait_for(websocket.receive(), timeout=30.0) except asyncio.TimeoutError: # Send keepalive await websocket.send_json({"type": "keepalive"}) continue if "text" in data: # Control message message = json.loads(data["text"]) if message.get("action") == "end": # Process accumulated audio if audio_chunks: try: result = asr_service.transcribe_stream(audio_chunks) await websocket.send_json({ "type": "final", "text": result["text"], "segments": result["segments"], "language": result["language"] }) except Exception as e: logger.error(f"Transcription error: {e}") await websocket.send_json({ "type": "error", "error": str(e) }) audio_chunks = [] elif message.get("action") == "reset": audio_chunks = [] elif "bytes" in data: # Audio chunk (binary) # Note: This is simplified - real implementation would need # proper audio format handling (PCM, sample rate, etc.) audio_chunks.append(data["bytes"]) # Send partial result (if available) # For now, just acknowledge await websocket.send_json({ "type": "partial", "status": "receiving" }) elif data.get("type") == "websocket.disconnect": break except WebSocketDisconnect: logger.info("WebSocket client disconnected") except Exception as e: logger.error(f"WebSocket error: {e}") try: await websocket.send_json({ "type": "error", "error": str(e) }) except: pass finally: try: await websocket.close() except: pass if __name__ == "__main__": import uvicorn logging.basicConfig(level=logging.INFO) uvicorn.run(app, host="0.0.0.0", port=8001)