#!/usr/bin/env python3 """ Advanced connectivity test for Ansible hosts with fallback IP support. Tests both primary and fallback IPs, provides detailed diagnostics, and suggests fixes. """ import subprocess import sys import argparse import json import re from pathlib import Path from typing import Dict, List, Tuple, Optional class ConnectivityTester: def __init__(self, hosts_file: str, timeout: int = 3): self.hosts_file = Path(hosts_file) self.timeout = timeout self.results = {} def test_ping(self, ip: str) -> bool: """Test if host is reachable via ping.""" try: result = subprocess.run( ['ping', '-c', '1', '-W', str(self.timeout), ip], capture_output=True, timeout=self.timeout + 1 ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError): return False def test_ssh(self, hostname: str, ip: str, user: str) -> Tuple[bool, str]: """Test SSH connectivity and return (success, error_message).""" try: result = subprocess.run( ['ssh', '-o', 'ConnectTimeout=3', '-o', 'BatchMode=yes', f'{user}@{ip}', 'exit'], capture_output=True, timeout=5 ) if result.returncode == 0: return True, "" else: error = result.stderr.decode().strip() return False, error except (subprocess.TimeoutExpired, FileNotFoundError) as e: return False, str(e) def parse_hosts_file(self) -> List[Dict]: """Parse hosts file and return structured host data.""" hosts = [] current_group = None with open(self.hosts_file, 'r') as f: for line in f: line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Group header if line.startswith('[') and line.endswith(']'): current_group = line[1:-1] continue # Host entry if current_group and 'ansible_host=' in line: host_data = self._parse_host_line(line, current_group) if host_data: hosts.append(host_data) return hosts def _parse_host_line(self, line: str, group: str) -> Optional[Dict]: """Parse a single host line and return host data.""" parts = line.split() if not parts: return None hostname = parts[0] attrs = {} for part in parts[1:]: if '=' in part: key, value = part.split('=', 1) attrs[key] = value return { 'hostname': hostname, 'group': group, 'primary_ip': attrs.get('ansible_host', ''), 'fallback_ip': attrs.get('ansible_host_fallback', ''), 'user': attrs.get('ansible_user', 'root'), 'original_line': line } def test_host(self, host_data: Dict) -> Dict: """Test connectivity for a single host.""" hostname = host_data['hostname'] primary_ip = host_data['primary_ip'] fallback_ip = host_data['fallback_ip'] user = host_data['user'] result = { 'hostname': hostname, 'group': host_data['group'], 'primary_ip': primary_ip, 'fallback_ip': fallback_ip, 'user': user, 'primary_ping': False, 'primary_ssh': False, 'fallback_ping': False, 'fallback_ssh': False, 'primary_ssh_error': '', 'fallback_ssh_error': '', 'recommendation': '', 'status': 'unknown' } # Test primary IP if primary_ip: result['primary_ping'] = self.test_ping(primary_ip) if result['primary_ping']: ssh_success, ssh_error = self.test_ssh(hostname, primary_ip, user) result['primary_ssh'] = ssh_success result['primary_ssh_error'] = ssh_error # Test fallback IP if fallback_ip: result['fallback_ping'] = self.test_ping(fallback_ip) if result['fallback_ping']: ssh_success, ssh_error = self.test_ssh(hostname, fallback_ip, user) result['fallback_ssh'] = ssh_success result['fallback_ssh_error'] = ssh_error # Determine status and recommendation result['status'], result['recommendation'] = self._analyze_connectivity(result) return result def _analyze_connectivity(self, result: Dict) -> Tuple[str, str]: """Analyze connectivity results and provide recommendations.""" hostname = result['hostname'] primary_ip = result['primary_ip'] fallback_ip = result['fallback_ip'] # Primary IP works perfectly if result['primary_ping'] and result['primary_ssh']: return 'success', f"✓ {hostname} is fully accessible via primary IP {primary_ip}" # Primary ping works but SSH fails if result['primary_ping'] and not result['primary_ssh']: error = result['primary_ssh_error'] if 'Permission denied' in error: return 'ssh_key', f"⚠ {hostname}: SSH key issue on {primary_ip} - run: make copy-ssh-key HOST={hostname}" elif 'Connection refused' in error: return 'ssh_service', f"⚠ {hostname}: SSH service not running on {primary_ip}" else: return 'ssh_error', f"⚠ {hostname}: SSH error on {primary_ip} - {error}" # Primary IP fails, test fallback if not result['primary_ping'] and fallback_ip: if result['fallback_ping'] and result['fallback_ssh']: return 'use_fallback', f"→ {hostname}: Switch to fallback IP {fallback_ip} (primary {primary_ip} failed)" elif result['fallback_ping'] and not result['fallback_ssh']: return 'fallback_ssh', f"⚠ {hostname}: Fallback IP {fallback_ip} reachable but SSH failed" else: return 'both_failed', f"✗ {hostname}: Both primary {primary_ip} and fallback {fallback_ip} failed" # No fallback IP and primary failed if not result['primary_ping'] and not fallback_ip: return 'no_fallback', f"✗ {hostname}: Primary IP {primary_ip} failed, no fallback available" return 'unknown', f"? {hostname}: Unknown connectivity state" def run_tests(self) -> List[Dict]: """Run connectivity tests for all hosts.""" hosts = self.parse_hosts_file() results = [] print("🔍 Testing host connectivity...") print("=" * 60) for host_data in hosts: print(f"Testing {host_data['hostname']}...", end=' ') result = self.test_host(host_data) results.append(result) # Print immediate status if result['status'] == 'success': print("✅") elif result['status'] in ['ssh_key', 'ssh_service', 'ssh_error']: print("⚠️") elif result['status'] == 'use_fallback': print("🔄") else: print("❌") return results def print_summary(self, results: List[Dict]): """Print detailed summary of connectivity test results.""" print("\n📊 CONNECTIVITY SUMMARY") print("=" * 60) # Group results by status by_status = {} for result in results: status = result['status'] if status not in by_status: by_status[status] = [] by_status[status].append(result) # Print each status group status_icons = { 'success': '✅', 'ssh_key': '🔑', 'ssh_service': '🔧', 'ssh_error': '⚠️', 'use_fallback': '🔄', 'both_failed': '❌', 'no_fallback': '🚫', 'unknown': '❓' } for status, hosts in by_status.items(): icon = status_icons.get(status, '❓') print(f"\n{icon} {status.upper().replace('_', ' ')} ({len(hosts)} hosts)") print("-" * 40) for result in hosts: print(f" {result['hostname']:<20} {result['recommendation']}") # Print actionable recommendations self._print_recommendations(results) def _print_recommendations(self, results: List[Dict]): """Print actionable recommendations based on test results.""" print("\n🛠️ RECOMMENDATIONS") print("=" * 60) # SSH key issues ssh_key_issues = [r for r in results if r['status'] == 'ssh_key'] if ssh_key_issues: print(f"\n🔑 Fix SSH key issues ({len(ssh_key_issues)} hosts):") for result in ssh_key_issues: print(f" make copy-ssh-key HOST={result['hostname']}") # Fallback recommendations fallback_needed = [r for r in results if r['status'] == 'use_fallback'] if fallback_needed: print(f"\n🔄 Switch to fallback IPs ({len(fallback_needed)} hosts):") for result in fallback_needed: print(f" sed -i 's/{result['hostname']} ansible_host={result['primary_ip']}/{result['hostname']} ansible_host={result['fallback_ip']}/' {self.hosts_file}") # Critical issues critical_issues = [r for r in results if r['status'] in ['both_failed', 'no_fallback']] if critical_issues: print(f"\n🚨 Critical issues ({len(critical_issues)} hosts):") for result in critical_issues: print(f" {result['hostname']}: {result['recommendation']}") # Auto-fallback suggestion if fallback_needed: print(f"\n🤖 Or run auto-fallback to fix automatically:") print(f" make auto-fallback") def export_json(self, results: List[Dict], output_file: str): """Export results to JSON file.""" with open(output_file, 'w') as f: json.dump(results, f, indent=2) print(f"\n📄 Results exported to: {output_file}") def main(): parser = argparse.ArgumentParser(description='Advanced connectivity test for Ansible hosts') parser.add_argument('--hosts-file', default='inventories/production/hosts', help='Path to hosts file') parser.add_argument('--timeout', type=int, default=3, help='Ping timeout in seconds') parser.add_argument('--json', help='Export results to JSON file') parser.add_argument('--quiet', action='store_true', help='Only show summary, not individual tests') args = parser.parse_args() if not Path(args.hosts_file).exists(): print(f"❌ Error: Hosts file not found: {args.hosts_file}") sys.exit(1) tester = ConnectivityTester(args.hosts_file, args.timeout) if args.quiet: # Suppress individual test output import os with open(os.devnull, 'w') as devnull: old_stdout = sys.stdout sys.stdout = devnull results = tester.run_tests() sys.stdout = old_stdout else: results = tester.run_tests() tester.print_summary(results) if args.json: tester.export_json(results, args.json) if __name__ == '__main__': main()