"""FastAPI backend for LLM Council.""" from fastapi import FastAPI, HTTPException, UploadFile, File, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse, Response from pydantic import BaseModel from typing import List, Dict, Any import uuid import json import asyncio import time from datetime import datetime from . import storage from . import documents from .config import MAX_DOC_PREVIEW_CHARS, COUNCIL_MODELS from .docs_context import build_docs_context from .llm_client import get_provider_info, list_models as llm_list_models, query_model, LLM_TIMEOUT_SECONDS, MAX_TOKENS from .council import run_full_council, generate_conversation_title, stage1_collect_responses, stage2_collect_rankings, stage3_synthesize_final, calculate_aggregate_rankings, _format_docs_context app = FastAPI(title="LLM Council API") # Enable CORS for local development app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "http://localhost:5174", "http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class CreateConversationRequest(BaseModel): """Request to create a new conversation.""" pass class SendMessageRequest(BaseModel): """Request to send a message in a conversation.""" content: str class ConversationMetadata(BaseModel): """Conversation metadata for list view.""" id: str created_at: str title: str message_count: int class Conversation(BaseModel): """Full conversation with all messages.""" id: str created_at: str title: str messages: List[Dict[str, Any]] @app.get("/") async def root(): """Health check endpoint.""" return {"status": "ok", "service": "LLM Council API"} @app.get("/api/llm/status") async def llm_status(probe: bool = Query(False, description="If true, query the provider for available models")): """ Returns current LLM provider configuration and (optionally) probes the provider. """ info = get_provider_info() if probe: info["remote_models"] = await llm_list_models() return info @app.get("/api/conversations", response_model=List[ConversationMetadata]) async def list_conversations(): """List all conversations (metadata only).""" return storage.list_conversations() @app.post("/api/conversations", response_model=Conversation) async def create_conversation(request: CreateConversationRequest): """Create a new conversation.""" conversation_id = str(uuid.uuid4()) conversation = storage.create_conversation(conversation_id) return conversation @app.get("/api/conversations/{conversation_id}", response_model=Conversation) async def get_conversation(conversation_id: str): """Get a specific conversation with all its messages.""" conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") return conversation @app.delete("/api/conversations/{conversation_id}") async def delete_conversation(conversation_id: str): """Delete a conversation and its associated documents.""" try: storage.delete_conversation(conversation_id) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) return {"ok": True} @app.get("/api/conversations/{conversation_id}/documents") async def list_conversation_documents(conversation_id: str): """List uploaded markdown documents for a conversation.""" conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") docs = documents.list_documents(conversation_id) return [{"id": d.id, "filename": d.filename, "bytes": d.bytes} for d in docs] @app.post("/api/conversations/{conversation_id}/documents") async def upload_conversation_document( conversation_id: str, files: List[UploadFile] = File(default=[]), file: UploadFile = File(default=None), ): """ Upload one or more markdown documents (.md) for a conversation. Backwards compatible: - old clients send a single "file" field - new clients can send multiple "files" fields """ conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") incoming: List[UploadFile] = [] if file is not None: incoming.append(file) if files: incoming.extend(files) if not incoming: raise HTTPException(status_code=400, detail="No files uploaded") uploaded = [] for f in incoming: filename = f.filename or "document.md" if not filename.lower().endswith(".md"): raise HTTPException(status_code=400, detail="Only .md files are supported") content = await f.read() try: meta = documents.save_markdown_document(conversation_id, filename, content) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) uploaded.append({"id": meta.id, "filename": meta.filename, "bytes": meta.bytes}) # Back-compat: if single file uploaded, return the single object shape. if len(uploaded) == 1: return uploaded[0] return {"uploaded": uploaded} @app.get("/api/conversations/{conversation_id}/documents/{doc_id}") async def get_conversation_document(conversation_id: str, doc_id: str): """Fetch a markdown document's text (truncated for safety).""" conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") try: text = documents.read_document_text(conversation_id, doc_id) except FileNotFoundError: raise HTTPException(status_code=404, detail="Document not found") if len(text) > MAX_DOC_PREVIEW_CHARS: text = text[: MAX_DOC_PREVIEW_CHARS - 3] + "..." return {"id": doc_id, "content": text} @app.delete("/api/conversations/{conversation_id}/documents/{doc_id}") async def delete_conversation_document(conversation_id: str, doc_id: str): """Delete a previously uploaded document.""" conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") try: documents.delete_document(conversation_id, doc_id) except FileNotFoundError: raise HTTPException(status_code=404, detail="Document not found") return {"ok": True} @app.patch("/api/conversations/{conversation_id}/title") async def update_conversation_title_endpoint(conversation_id: str, request: dict): """Update the title of a conversation.""" conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") new_title = request.get('title', '').strip() if not new_title: raise HTTPException(status_code=400, detail="Title cannot be empty") storage.update_conversation_title(conversation_id, new_title) return {"ok": True, "title": new_title} @app.get("/api/conversations/search") async def search_conversations(q: str = ""): """Search conversations by title and content.""" if not q or len(q.strip()) < 2: return [] query = q.strip().lower() all_conversations = storage.list_conversations() results = [] for conv_meta in all_conversations: # Search in title title_match = query in (conv_meta.get('title', '') or '').lower() # Search in content conv = storage.get_conversation(conv_meta['id']) content_match = False if conv: for msg in conv.get('messages', []): if query in msg.get('content', '').lower(): content_match = True break if title_match or content_match: results.append(conv_meta) return results @app.post("/api/conversations/{conversation_id}/message") async def send_message(conversation_id: str, request: SendMessageRequest): """ Send a message and run the 3-stage council process. Returns the complete response with all stages. """ # Check if conversation exists conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") # Check if this is the first message is_first_message = len(conversation["messages"]) == 0 # Add user message storage.add_user_message(conversation_id, request.content) # If this is the first message, generate a title if is_first_message: title = await generate_conversation_title(request.content) storage.update_conversation_title(conversation_id, title) # Run the 3-stage council process docs_text = build_docs_context(conversation_id, user_query=request.content) stage1_results, stage2_results, stage3_result, metadata = await run_full_council( request.content, docs_text=docs_text, ) # Add assistant message with all stages storage.add_assistant_message( conversation_id, stage1_results, stage2_results, stage3_result, metadata ) # Return the complete response with metadata return { "stage1": stage1_results, "stage2": stage2_results, "stage3": stage3_result, "metadata": metadata } @app.post("/api/conversations/{conversation_id}/message/stream") async def send_message_stream(conversation_id: str, request: SendMessageRequest): """ Send a message and stream the 3-stage council process. Returns Server-Sent Events as each stage completes. """ # Check if conversation exists conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") # Check if this is the first message is_first_message = len(conversation["messages"]) == 0 async def event_generator(): try: # Add user message storage.add_user_message(conversation_id, request.content) # Start title generation in parallel (don't await yet) title_task = None if is_first_message: title_task = asyncio.create_task(generate_conversation_title(request.content)) # Load docs context once per request docs_text = build_docs_context(conversation_id, user_query=request.content) # Stage 1: Collect responses - stream individual responses as they complete yield f"data: {json.dumps({'type': 'stage1_start'})}\n\n" # Stream responses as they complete from .config import COUNCIL_MODELS, OPENAI_COMPAT_BASE_URL, DEBUG from .llm_client import query_model, LLM_TIMEOUT_SECONDS, MAX_TOKENS from .council import _format_docs_context if DEBUG: print(f"[DEBUG] Stage 1: Querying {len(COUNCIL_MODELS)} models: {COUNCIL_MODELS}") print(f"[DEBUG] Using base URL: {OPENAI_COMPAT_BASE_URL}") start_time = time.time() prompt = f"{request.content}{_format_docs_context(docs_text)}" messages = [{"role": "user", "content": prompt}] stage1_results = [] successful_models = [] failed_models = [] response_queue = asyncio.Queue() async def process_model(model: str): try: if DEBUG: print(f"[DEBUG] Processing model: {model}") response = await query_model(model, messages, timeout=LLM_TIMEOUT_SECONDS, max_tokens_override=MAX_TOKENS) if response is not None: result = {"model": model, "response": response.get('content', '')} await response_queue.put(('success', model, result)) if DEBUG: print(f"[DEBUG] Model {model} succeeded") else: await response_queue.put(('failed', model, None)) if DEBUG: print(f"[DEBUG] Model {model} failed (returned None)") except Exception as e: await response_queue.put(('failed', model, None)) if DEBUG: print(f"[DEBUG] Model {model} exception: {e}") # Create tasks tasks = [asyncio.create_task(process_model(model)) for model in COUNCIL_MODELS] # Process responses as they arrive completed = 0 while completed < len(COUNCIL_MODELS): status, model, result = await response_queue.get() if status == 'success': stage1_results.append(result) successful_models.append(model) # Stream this response immediately yield f"data: {json.dumps({'type': 'stage1_response', 'model': model, 'response': result})}\n\n" else: failed_models.append(model) # Stream failure notification yield f"data: {json.dumps({'type': 'stage1_response_failed', 'model': model})}\n\n" completed += 1 # Wait for all tasks to complete await asyncio.gather(*tasks, return_exceptions=True) duration = time.time() - start_time stage1_metadata = { "duration_seconds": round(duration, 2), "successful_models": successful_models, "failed_models": failed_models, "total_models": len(COUNCIL_MODELS) } yield f"data: {json.dumps({'type': 'stage1_complete', 'data': stage1_results, 'metadata': stage1_metadata})}\n\n" # Stage 2: Collect rankings yield f"data: {json.dumps({'type': 'stage2_start'})}\n\n" stage2_results, label_to_model, stage2_metadata = await stage2_collect_rankings(request.content, stage1_results, docs_text=docs_text) aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) yield f"data: {json.dumps({'type': 'stage2_complete', 'data': stage2_results, 'metadata': {'label_to_model': label_to_model, 'aggregate_rankings': aggregate_rankings, 'stage2_metadata': stage2_metadata}})}\n\n" # Stage 3: Synthesize final answer yield f"data: {json.dumps({'type': 'stage3_start'})}\n\n" stage3_result, stage3_metadata = await stage3_synthesize_final(request.content, stage1_results, stage2_results, docs_text=docs_text) yield f"data: {json.dumps({'type': 'stage3_complete', 'data': stage3_result, 'metadata': stage3_metadata})}\n\n" # Wait for title generation if it was started if title_task: title = await title_task storage.update_conversation_title(conversation_id, title) yield f"data: {json.dumps({'type': 'title_complete', 'data': {'title': title}})}\n\n" # Prepare metadata metadata = { "label_to_model": label_to_model, "aggregate_rankings": aggregate_rankings, "stage1_metadata": stage1_metadata, "stage2_metadata": stage2_metadata, "stage3_metadata": stage3_metadata } # Save complete assistant message storage.add_assistant_message( conversation_id, stage1_results, stage2_results, stage3_result, metadata ) # Send completion event yield f"data: {json.dumps({'type': 'complete'})}\n\n" except Exception as e: # Send error event with details import traceback from .config import DEBUG error_msg = str(e) if DEBUG: error_msg += f"\n{traceback.format_exc()}" print(f"[ERROR] Stream error: {error_msg}") yield f"data: {json.dumps({'type': 'error', 'message': error_msg})}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", } ) @app.get("/api/conversations/{conversation_id}/export") async def export_conversation_report(conversation_id: str): """Export conversation as a markdown report file.""" conversation = storage.get_conversation(conversation_id) if conversation is None: raise HTTPException(status_code=404, detail="Conversation not found") # Get document list to map @1, @2, etc. to filenames from . import documents doc_list = documents.list_documents(conversation_id) doc_map = {} # Maps @1 -> filename, @2 -> filename, etc. for idx, doc in enumerate(doc_list, 1): doc_map[f"@{idx}"] = doc.filename doc_map[f"@ {idx}"] = doc.filename # Also handle @ 1 (with space) def replace_doc_references(text: str) -> str: """Replace @1, @2, etc. with actual filenames.""" import re # Replace @1, @2, @3, etc. with filenames for ref, filename in doc_map.items(): # Match @1, @2, etc. (with optional space after @) pattern = re.escape(ref) text = re.sub(pattern, filename, text) return text # Generate markdown report lines = [] lines.append(f"# {conversation.get('title', 'Conversation')}\n") # Add metadata created_at = conversation.get('created_at', '') if created_at: try: dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) lines.append(f"**Created:** {dt.strftime('%Y-%m-%d %H:%M:%S UTC')}\n") except: lines.append(f"**Created:** {created_at}\n") lines.append(f"**Conversation ID:** {conversation_id}\n") lines.append("\n---\n\n") # Add messages for msg_idx, msg in enumerate(conversation.get('messages', []), 1): if msg['role'] == 'user': lines.append(f"## User Message {msg_idx}\n\n") content = replace_doc_references(msg['content']) lines.append(f"{content}\n\n") lines.append("---\n\n") elif msg['role'] == 'assistant': lines.append(f"## LLM Council Response {msg_idx}\n\n") metadata = msg.get('metadata', {}) # Stage 1 if msg.get('stage1'): stage1_meta = metadata.get('stage1_metadata', {}) duration = stage1_meta.get('duration_seconds', 0) successful = len(stage1_meta.get('successful_models', [])) total = stage1_meta.get('total_models', 0) lines.append("### Stage 1: Individual Responses\n\n") if duration: lines.append(f"*Duration: {duration}s | Successful: {successful}/{total} models*\n\n") for response in msg['stage1']: lines.append(f"#### {response['model']}\n\n") content = replace_doc_references(response['response']) lines.append(f"{content}\n\n") lines.append("\n---\n\n") # Stage 2 if msg.get('stage2'): stage2_meta = metadata.get('stage2_metadata', {}) duration = stage2_meta.get('duration_seconds', 0) successful = len(stage2_meta.get('successful_models', [])) total = stage2_meta.get('total_models', 0) lines.append("### Stage 2: Peer Rankings\n\n") if duration: lines.append(f"*Duration: {duration}s | Successful: {successful}/{total} models*\n\n") for ranking in msg['stage2']: lines.append(f"#### {ranking['model']}\n\n") content = replace_doc_references(ranking['ranking']) lines.append(f"{content}\n\n") # Add aggregate rankings if available agg_rankings = metadata.get('aggregate_rankings', []) if agg_rankings: lines.append("#### Aggregate Rankings\n\n") for item in agg_rankings: lines.append(f"- **{item['model']}**: Average rank {item['average_rank']:.2f}\n") lines.append("\n") lines.append("\n---\n\n") # Stage 3 if msg.get('stage3'): stage3_meta = metadata.get('stage3_metadata', {}) duration = stage3_meta.get('duration_seconds', 0) model = stage3_meta.get('model', msg['stage3'].get('model', 'Unknown')) lines.append("### Stage 3: Final Synthesis\n\n") if duration: lines.append(f"*Duration: {duration}s | Model: {model}*\n\n") content = replace_doc_references(msg['stage3'].get('response', '')) lines.append(f"{content}\n\n") # Total duration total_duration = metadata.get('total_duration_seconds') if total_duration: lines.append(f"**Total processing time:** {total_duration}s\n\n") lines.append("---\n\n") # Convert to string content = "".join(lines) # Generate filename title = conversation.get('title', 'conversation') # Sanitize filename safe_title = "".join(c if c.isalnum() or c in (' ', '-', '_') else '_' for c in title) safe_title = safe_title[:50].strip() # Limit length filename = f"{safe_title}_{conversation_id[:8]}.md" return Response( content=content, media_type="text/markdown", headers={ "Content-Disposition": f'attachment; filename="{filename}"' } ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001)