"""3-stage LLM Council orchestration.""" import time from typing import List, Dict, Any, Tuple, Optional from .llm_client import query_models_parallel, query_model from .config import COUNCIL_MODELS, CHAIRMAN_MODEL, CHAIRMAN_TIMEOUT_SECONDS, TITLE_GENERATION_TIMEOUT_SECONDS def _format_docs_context(docs_text: Optional[str]) -> str: if not docs_text or not docs_text.strip(): return "" return ( "\n\nREFERENCE DOCUMENTS (user-provided markdown):\n" "Use these as additional context if relevant. Quote sparingly and cite sections when helpful.\n" f"{docs_text.strip()}\n" ) async def stage1_collect_responses(user_query: str, docs_text: Optional[str] = None) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """ Stage 1: Collect individual responses from all council models. Args: user_query: The user's question Returns: Tuple of (results list, metadata dict with timing info) """ start_time = time.time() prompt = f"{user_query}{_format_docs_context(docs_text)}" messages = [{"role": "user", "content": prompt}] # Query all models in parallel responses = await query_models_parallel(COUNCIL_MODELS, messages) duration = time.time() - start_time # Format results stage1_results = [] successful_models = [] failed_models = [] for model, response in responses.items(): if response is not None: # Only include successful responses stage1_results.append({ "model": model, "response": response.get('content', '') }) successful_models.append(model) else: failed_models.append(model) metadata = { "duration_seconds": round(duration, 2), "successful_models": successful_models, "failed_models": failed_models, "total_models": len(COUNCIL_MODELS) } return stage1_results, metadata async def stage1_collect_responses_streaming( user_query: str, docs_text: Optional[str] = None, on_response = None ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """ Stage 1: Collect individual responses from all council models, streaming as they complete. Args: user_query: The user's question docs_text: Optional document context on_response: Optional callback(model, response_dict) called as each response completes Returns: Tuple of (results list, metadata dict with timing info) """ import asyncio from .llm_client import query_model, LLM_TIMEOUT_SECONDS, MAX_TOKENS start_time = time.time() prompt = f"{user_query}{_format_docs_context(docs_text)}" messages = [{"role": "user", "content": prompt}] # Query all models in parallel, but yield results as they complete async def query_and_notify(model: str): response = await query_model(model, messages, timeout=LLM_TIMEOUT_SECONDS, max_tokens_override=MAX_TOKENS) if on_response and response is not None: result = {"model": model, "response": response.get('content', '')} await on_response(model, result) return model, response tasks = [query_and_notify(model) for model in COUNCIL_MODELS] responses_dict = {} # Use as_completed to process results as they finish for coro in asyncio.as_completed(tasks): model, response = await coro responses_dict[model] = response duration = time.time() - start_time # Format results stage1_results = [] successful_models = [] failed_models = [] for model, response in responses_dict.items(): if response is not None: # Only include successful responses stage1_results.append({ "model": model, "response": response.get('content', '') }) successful_models.append(model) else: failed_models.append(model) metadata = { "duration_seconds": round(duration, 2), "successful_models": successful_models, "failed_models": failed_models, "total_models": len(COUNCIL_MODELS) } return stage1_results, metadata async def stage2_collect_rankings( user_query: str, stage1_results: List[Dict[str, Any]], docs_text: Optional[str] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, str], Dict[str, Any]]: """ Stage 2: Each model ranks the anonymized responses. Args: user_query: The original user query stage1_results: Results from Stage 1 Returns: Tuple of (rankings list, label_to_model mapping, metadata dict with timing) """ start_time = time.time() # Handle empty stage1_results if not stage1_results: return [], {}, {"duration_seconds": 0.0, "successful_models": [], "failed_models": [], "total_models": len(COUNCIL_MODELS)} # Create anonymized labels for responses (Response A, Response B, etc.) labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ... # Create mapping from label to model name label_to_model = { f"Response {label}": result['model'] for label, result in zip(labels, stage1_results) } # Build the ranking prompt responses_text = "\n\n".join([ f"Response {label}:\n{result['response']}" for label, result in zip(labels, stage1_results) ]) ranking_prompt = f"""You are evaluating different responses to the following question: Question: {user_query} {_format_docs_context(docs_text)} Here are the responses from different models (anonymized): {responses_text} Your task: 1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly. 2. Then, at the very end of your response, provide a final ranking. IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows: - Start with the line "FINAL RANKING:" (all caps, with colon) - Then list the responses from best to worst as a numbered list - Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A") - Do not add any other text or explanations in the ranking section Example of the correct format for your ENTIRE response: Response A provides good detail on X but misses Y... Response B is accurate but lacks depth on Z... Response C offers the most comprehensive answer... FINAL RANKING: 1. Response C 2. Response A 3. Response B Now provide your evaluation and ranking:""" messages = [{"role": "user", "content": ranking_prompt}] # Get rankings from all council models in parallel responses = await query_models_parallel(COUNCIL_MODELS, messages) duration = time.time() - start_time # Format results stage2_results = [] successful_models = [] failed_models = [] for model, response in responses.items(): if response is not None: full_text = response.get('content', '') parsed = parse_ranking_from_text(full_text) stage2_results.append({ "model": model, "ranking": full_text, "parsed_ranking": parsed }) successful_models.append(model) else: failed_models.append(model) metadata = { "duration_seconds": round(duration, 2), "successful_models": successful_models, "failed_models": failed_models, "total_models": len(COUNCIL_MODELS) } return stage2_results, label_to_model, metadata async def stage3_synthesize_final( user_query: str, stage1_results: List[Dict[str, Any]], stage2_results: List[Dict[str, Any]], docs_text: Optional[str] = None, ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """ Stage 3: Chairman synthesizes final response. Args: user_query: The original user query stage1_results: Individual model responses from Stage 1 stage2_results: Rankings from Stage 2 Returns: Tuple of (result dict with 'model' and 'response' keys, metadata dict with timing) """ start_time = time.time() # Handle empty inputs if not stage1_results: duration = time.time() - start_time return { "model": CHAIRMAN_MODEL, "response": "Error: Cannot synthesize final answer - no responses from Stage 1." }, { "duration_seconds": round(duration, 2), "model": CHAIRMAN_MODEL, "success": False } # Build comprehensive context for chairman # Truncate very long responses to avoid exceeding token/context limits # More aggressive truncation to keep total prompt under ~2000 tokens (~8000 chars) MAX_RESPONSE_LENGTH = 2000 # Characters per response MAX_RANKING_LENGTH = 1000 # Characters per ranking MAX_DOCS_LENGTH = 2000 # Max characters for docs context MAX_TOTAL_PROMPT_LENGTH = 8000 # Max total prompt length (safety limit) def truncate_text(text: str, max_length: int) -> str: """Truncate text to max_length, adding ellipsis if truncated.""" if len(text) <= max_length: return text return text[:max_length-3] + "..." # Truncate docs_text if provided truncated_docs_text = docs_text if docs_text and len(docs_text) > MAX_DOCS_LENGTH: truncated_docs_text = truncate_text(docs_text, MAX_DOCS_LENGTH) stage1_text = "\n\n".join([ f"Model: {result['model']}\nResponse: {truncate_text(result['response'], MAX_RESPONSE_LENGTH)}" for result in stage1_results ]) stage2_text = "\n\n".join([ f"Model: {result['model']}\nRanking: {truncate_text(result['ranking'], MAX_RANKING_LENGTH)}" for result in stage2_results ]) if stage2_results else "No rankings available from Stage 2." chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses. Original Question: {user_query} {_format_docs_context(truncated_docs_text)} STAGE 1 - Individual Responses: {stage1_text} STAGE 2 - Peer Rankings: {stage2_text} Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider: - The individual responses and their insights - The peer rankings and what they reveal about response quality - Any patterns of agreement or disagreement Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" # Apply final safety truncation if prompt is still too long if len(chairman_prompt) > MAX_TOTAL_PROMPT_LENGTH: # Truncate the prompt itself if it exceeds the limit chairman_prompt = chairman_prompt[:MAX_TOTAL_PROMPT_LENGTH - 100] + "\n\n[Content truncated due to length limits...]\n\nProvide a clear, well-reasoned final answer:" messages = [{"role": "user", "content": chairman_prompt}] # Query the chairman model # Note: For very long prompts, we might need to truncate or summarize # For now, we'll try with the full prompt and handle errors gracefully # Use the default max_tokens (2048) to stay within credit limits # If you have more credits, you can increase MAX_TOKENS in config.py response = await query_model(CHAIRMAN_MODEL, messages, timeout=CHAIRMAN_TIMEOUT_SECONDS) duration = time.time() - start_time if response is None: # Try to get more specific error info - check if prompt might be too long prompt_length = len(chairman_prompt) estimated_tokens = prompt_length // 4 # Rough estimate: ~4 chars per token error_msg = ( "Error: Unable to generate final synthesis.\n\n" "The chairman model failed to respond. Possible causes:\n" "- Model '{}' not available on the server\n" "- Server not running or unreachable\n" "- Network/API errors\n" "- Prompt too long (estimated ~{} tokens)\n" "- Server timeout or overloaded\n\n" "Check the backend terminal logs for the exact error message." ).format(CHAIRMAN_MODEL, estimated_tokens) return { "model": CHAIRMAN_MODEL, "response": error_msg }, { "duration_seconds": round(duration, 2), "model": CHAIRMAN_MODEL, "success": False } return { "model": CHAIRMAN_MODEL, "response": response.get('content', '') }, { "duration_seconds": round(duration, 2), "model": CHAIRMAN_MODEL, "success": True } def parse_ranking_from_text(ranking_text: str) -> List[str]: """ Parse the FINAL RANKING section from the model's response. Args: ranking_text: The full text response from the model Returns: List of response labels in ranked order """ import re # Look for "FINAL RANKING:" section if "FINAL RANKING:" in ranking_text: # Extract everything after "FINAL RANKING:" parts = ranking_text.split("FINAL RANKING:") if len(parts) >= 2: ranking_section = parts[1] # Try to extract numbered list format (e.g., "1. Response A") # This pattern looks for: number, period, optional space, "Response X" numbered_matches = re.findall(r'\d+\.\s*Response [A-Z]', ranking_section) if numbered_matches: # Extract just the "Response X" part return [re.search(r'Response [A-Z]', m).group() for m in numbered_matches] # Fallback: Extract all "Response X" patterns in order matches = re.findall(r'Response [A-Z]', ranking_section) return matches # Fallback: try to find any "Response X" patterns in order matches = re.findall(r'Response [A-Z]', ranking_text) return matches def calculate_aggregate_rankings( stage2_results: List[Dict[str, Any]], label_to_model: Dict[str, str] ) -> List[Dict[str, Any]]: """ Calculate aggregate rankings across all models. Args: stage2_results: Rankings from each model label_to_model: Mapping from anonymous labels to model names Returns: List of dicts with model name and average rank, sorted best to worst """ from collections import defaultdict # Track positions for each model model_positions = defaultdict(list) for ranking in stage2_results: ranking_text = ranking['ranking'] # Parse the ranking from the structured format parsed_ranking = parse_ranking_from_text(ranking_text) for position, label in enumerate(parsed_ranking, start=1): if label in label_to_model: model_name = label_to_model[label] model_positions[model_name].append(position) # Calculate average position for each model aggregate = [] for model, positions in model_positions.items(): if positions: avg_rank = sum(positions) / len(positions) aggregate.append({ "model": model, "average_rank": round(avg_rank, 2), "rankings_count": len(positions) }) # Sort by average rank (lower is better) aggregate.sort(key=lambda x: x['average_rank']) return aggregate async def generate_conversation_title(user_query: str) -> str: """ Generate a short title for a conversation based on the first user message. Args: user_query: The first user message Returns: A short title (3-5 words) """ title_prompt = f"""Generate a very short title (3-5 words maximum) that summarizes the following question. The title should be concise and descriptive. Do not use quotes or punctuation in the title. Question: {user_query} Title:""" messages = [{"role": "user", "content": title_prompt}] # Use chairman model for title generation # Use configurable timeout (may need longer for local models which load on first request) response = await query_model(CHAIRMAN_MODEL, messages, timeout=TITLE_GENERATION_TIMEOUT_SECONDS, max_tokens_override=50) if response is None: # Fallback to a generic title return "New Conversation" title = response.get('content', 'New Conversation').strip() # Clean up the title - remove quotes, limit length title = title.strip('"\'') # Truncate if too long if len(title) > 50: title = title[:47] + "..." return title async def run_full_council(user_query: str, docs_text: Optional[str] = None) -> Tuple[List, List, Dict, Dict]: """ Run the complete 3-stage council process. Args: user_query: The user's question Returns: Tuple of (stage1_results, stage2_results, stage3_result, metadata) """ total_start_time = time.time() # Stage 1: Collect individual responses stage1_results, stage1_metadata = await stage1_collect_responses(user_query, docs_text=docs_text) # If no models responded successfully, return error with helpful message if not stage1_results: error_msg = ( "All models failed to respond. This could be due to:\n" "- Server not running or unreachable\n" "- Model names not available on the server\n" "- Network/API errors\n" "- Server timeout or overloaded\n" "- Invalid OPENAI_COMPAT_BASE_URL configuration\n\n" "Check the backend logs for detailed error messages." ) total_duration = time.time() - total_start_time return [], [], { "model": "error", "response": error_msg }, { "label_to_model": {}, "aggregate_rankings": {}, "stage1_metadata": stage1_metadata, "stage2_metadata": {}, "stage3_metadata": {}, "total_duration_seconds": round(total_duration, 2) } # Stage 2: Collect rankings stage2_results, label_to_model, stage2_metadata = await stage2_collect_rankings(user_query, stage1_results, docs_text=docs_text) # Calculate aggregate rankings aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) # Stage 3: Synthesize final answer stage3_result, stage3_metadata = await stage3_synthesize_final( user_query, stage1_results, stage2_results, docs_text=docs_text, ) total_duration = time.time() - total_start_time # Prepare metadata metadata = { "label_to_model": label_to_model, "aggregate_rankings": aggregate_rankings, "stage1_metadata": stage1_metadata, "stage2_metadata": stage2_metadata, "stage3_metadata": stage3_metadata, "total_duration_seconds": round(total_duration, 2) } return stage1_results, stage2_results, stage3_result, metadata