#!/usr/bin/env python3 """ Advanced connectivity test for Ansible hosts with fallback IP support. Tests both primary and fallback IPs, provides detailed diagnostics, and suggests fixes. """ import subprocess import sys import argparse import json import re from pathlib import Path from typing import Dict, List, Tuple, Optional class ConnectivityTester: def __init__(self, hosts_file: str, timeout: int = 3): self.hosts_file = Path(hosts_file) self.timeout = timeout self.results = {} def test_ping(self, ip: str) -> bool: """Test if host is reachable via ping.""" try: result = subprocess.run( ['ping', '-c', '1', '-W', str(self.timeout), ip], capture_output=True, timeout=self.timeout + 1 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False def test_ssh(self, hostname: str, ip: str, user: str) -> Tuple[bool, str]: """Test SSH connectivity and return (success, error_message).""" try: result = subprocess.run( ['ssh', '-o', 'ConnectTimeout=3', '-o', 'BatchMode=yes', f'{user}@{ip}', 'exit'], capture_output=True, timeout=5 ) if result.returncode == 0: return True, "" else: error = result.stderr.decode().strip() return False, error except (subprocess.TimeoutExpired, FileNotFoundError) as e: return False, str(e) def parse_hosts_file(self) -> List[Dict]: """Parse hosts file and return structured host data.""" hosts = [] current_group = None with open(self.hosts_file, 'r') as f: for line in f: line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Group header if line.startswith('[') and line.endswith(']'): current_group = line[1:-1] continue # Host entry if current_group and 'ansible_host=' in line: host_data = self._parse_host_line(line, current_group) if host_data: hosts.append(host_data) return hosts def _parse_host_line(self, line: str, group: str) -> Optional[Dict]: """Parse a single host line and return host data.""" parts = line.split() if not parts: return None hostname = parts[0] attrs = {} for part in parts[1:]: if '=' in part: key, value = part.split('=', 1) attrs[key] = value return { 'hostname': hostname, 'group': group, 'primary_ip': attrs.get('ansible_host', ''), 'fallback_ip': attrs.get('ansible_host_fallback', ''), 'user': attrs.get('ansible_user', 'root'), 'original_line': line } def test_host(self, host_data: Dict) -> Dict: """Test connectivity for a single host.""" hostname = host_data['hostname'] primary_ip = host_data['primary_ip'] fallback_ip = host_data['fallback_ip'] user = host_data['user'] result = { 'hostname': hostname, 'group': host_data['group'], 'primary_ip': primary_ip, 'fallback_ip': fallback_ip, 'user': user, 'primary_ping': False, 'primary_ssh': False, 'fallback_ping': False, 'fallback_ssh': False, 'primary_ssh_error': '', 'fallback_ssh_error': '', 'recommendation': '', 'status': 'unknown' } # Test primary IP if primary_ip: result['primary_ping'] = self.test_ping(primary_ip) if result['primary_ping']: ssh_success, ssh_error = self.test_ssh(hostname, primary_ip, user) result['primary_ssh'] = ssh_success result['primary_ssh_error'] = ssh_error # Test fallback IP if fallback_ip: result['fallback_ping'] = self.test_ping(fallback_ip) if result['fallback_ping']: ssh_success, ssh_error = self.test_ssh(hostname, fallback_ip, user) result['fallback_ssh'] = ssh_success result['fallback_ssh_error'] = ssh_error # Determine status and recommendation result['status'], result['recommendation'] = self._analyze_connectivity(result) return result def _analyze_connectivity(self, result: Dict) -> Tuple[str, str]: """Analyze connectivity results and provide recommendations. Split into smaller helpers to keep this function's complexity low while preserving the original decision logic. """ for handler in ( self._handle_primary_success, self._handle_primary_ping_only, self._handle_fallback_path, self._handle_no_fallback, ): outcome = handler(result) if outcome is not None: return outcome hostname = result["hostname"] return "unknown", f"? {hostname}: Unknown connectivity state" def _handle_primary_success(self, result: Dict) -> Optional[Tuple[str, str]]: """Handle case where primary IP works perfectly.""" if result.get("primary_ping") and result.get("primary_ssh"): hostname = result["hostname"] primary_ip = result["primary_ip"] return "success", f"✓ {hostname} is fully accessible via primary IP {primary_ip}" return None def _handle_primary_ping_only(self, result: Dict) -> Optional[Tuple[str, str]]: """Handle cases where primary ping works but SSH fails.""" if result.get("primary_ping") and not result.get("primary_ssh"): hostname = result["hostname"] primary_ip = result["primary_ip"] error = result.get("primary_ssh_error", "") if "Permission denied" in error: return ( "ssh_key", f"⚠ {hostname}: SSH key issue on {primary_ip} - run: make copy-ssh-key HOST={hostname}", ) if "Connection refused" in error: return "ssh_service", f"⚠ {hostname}: SSH service not running on {primary_ip}" return "ssh_error", f"⚠ {hostname}: SSH error on {primary_ip} - {error}" return None def _handle_fallback_path(self, result: Dict) -> Optional[Tuple[str, str]]: """Handle cases where primary fails and a fallback IP is defined.""" if result.get("primary_ping"): return None fallback_ip = result.get("fallback_ip") if not fallback_ip: return None hostname = result["hostname"] primary_ip = result["primary_ip"] if result.get("fallback_ping") and result.get("fallback_ssh"): return ( "use_fallback", f"→ {hostname}: Switch to fallback IP {fallback_ip} (primary {primary_ip} failed)", ) if result.get("fallback_ping") and not result.get("fallback_ssh"): return ( "fallback_ssh", f"⚠ {hostname}: Fallback IP {fallback_ip} reachable but SSH failed", ) return ( "both_failed", f"✗ {hostname}: Both primary {primary_ip} and fallback {fallback_ip} failed", ) def _handle_no_fallback(self, result: Dict) -> Optional[Tuple[str, str]]: """Handle cases where primary failed and no fallback IP is available.""" if result.get("primary_ping"): return None fallback_ip = result.get("fallback_ip") if fallback_ip: return None hostname = result["hostname"] primary_ip = result["primary_ip"] return "no_fallback", f"✗ {hostname}: Primary IP {primary_ip} failed, no fallback available" def run_tests(self) -> List[Dict]: """Run connectivity tests for all hosts.""" hosts = self.parse_hosts_file() results = [] print("🔍 Testing host connectivity...") print("=" * 60) for host_data in hosts: print(f"Testing {host_data['hostname']}...", end=' ') result = self.test_host(host_data) results.append(result) # Print immediate status if result['status'] == 'success': print("✅") elif result['status'] in ['ssh_key', 'ssh_service', 'ssh_error']: print("⚠️") elif result['status'] == 'use_fallback': print("🔄") else: print("❌") return results def print_summary(self, results: List[Dict]): """Print detailed summary of connectivity test results.""" print("\n📊 CONNECTIVITY SUMMARY") print("=" * 60) # Group results by status by_status = {} for result in results: status = result['status'] if status not in by_status: by_status[status] = [] by_status[status].append(result) # Print each status group status_icons = { 'success': '✅', 'ssh_key': '🔑', 'ssh_service': '🔧', 'ssh_error': '⚠️', 'use_fallback': '🔄', 'both_failed': '❌', 'no_fallback': '🚫', 'unknown': '❓' } for status, hosts in by_status.items(): icon = status_icons.get(status, '❓') print(f"\n{icon} {status.upper().replace('_', ' ')} ({len(hosts)} hosts)") print("-" * 40) for result in hosts: print(f" {result['hostname']:<20} {result['recommendation']}") # Print actionable recommendations self._print_recommendations(results) def _print_recommendations(self, results: List[Dict]): """Print actionable recommendations based on test results.""" print("\n🛠️ RECOMMENDATIONS") print("=" * 60) # SSH key issues ssh_key_issues = [r for r in results if r['status'] == 'ssh_key'] if ssh_key_issues: print(f"\n🔑 Fix SSH key issues ({len(ssh_key_issues)} hosts):") for result in ssh_key_issues: print(f" make copy-ssh-key HOST={result['hostname']}") # Fallback recommendations fallback_needed = [r for r in results if r['status'] == 'use_fallback'] if fallback_needed: print(f"\n🔄 Switch to fallback IPs ({len(fallback_needed)} hosts):") for result in fallback_needed: print(f" sed -i 's/{result['hostname']} ansible_host={result['primary_ip']}/{result['hostname']} ansible_host={result['fallback_ip']}/' {self.hosts_file}") # Critical issues critical_issues = [r for r in results if r['status'] in ['both_failed', 'no_fallback']] if critical_issues: print(f"\n🚨 Critical issues ({len(critical_issues)} hosts):") for result in critical_issues: print(f" {result['hostname']}: {result['recommendation']}") # Auto-fallback suggestion if fallback_needed: print("\n🤖 Or run auto-fallback to fix automatically:") print(" make auto-fallback") def export_json(self, results: List[Dict], output_file: str): """Export results to JSON file.""" with open(output_file, 'w') as f: json.dump(results, f, indent=2) print(f"\n📄 Results exported to: {output_file}") def main(): parser = argparse.ArgumentParser(description='Advanced connectivity test for Ansible hosts') parser.add_argument('--hosts-file', default='inventories/production/hosts', help='Path to hosts file') parser.add_argument('--timeout', type=int, default=3, help='Ping timeout in seconds') parser.add_argument('--json', help='Export results to JSON file') parser.add_argument('--quiet', action='store_true', help='Only show summary, not individual tests') args = parser.parse_args() if not Path(args.hosts_file).exists(): print(f"❌ Error: Hosts file not found: {args.hosts_file}") sys.exit(1) tester = ConnectivityTester(args.hosts_file, args.timeout) if args.quiet: # Suppress individual test output import os with open(os.devnull, 'w') as devnull: old_stdout = sys.stdout sys.stdout = devnull results = tester.run_tests() sys.stdout = old_stdout else: results = tester.run_tests() tester.print_summary(results) if args.json: tester.export_json(results, args.json) if __name__ == '__main__': main()