ansible/test_connectivity.py
ilia e05b3aa0d5 Update ansible.cfg and auto-fallback script for improved connectivity handling
- Modify ansible.cfg to increase SSH connection retries from 2 to 3 and add a connection timeout setting for better reliability.
- Enhance auto-fallback.sh script to provide detailed feedback during IP connectivity tests, including clearer status messages for primary and fallback IP checks.
- Update documentation to reflect changes in connectivity testing and fallback procedures.

These updates improve the robustness of the connectivity testing process and ensure smoother operations during IP failover scenarios.
2025-09-16 23:00:32 -04:00

312 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Advanced connectivity test for Ansible hosts with fallback IP support.
Tests both primary and fallback IPs, provides detailed diagnostics, and suggests fixes.
"""
import subprocess
import sys
import argparse
import json
import re
from pathlib import Path
from typing import Dict, List, Tuple, Optional
class ConnectivityTester:
def __init__(self, hosts_file: str, timeout: int = 3):
self.hosts_file = Path(hosts_file)
self.timeout = timeout
self.results = {}
def test_ping(self, ip: str) -> bool:
"""Test if host is reachable via ping."""
try:
result = subprocess.run(
['ping', '-c', '1', '-W', str(self.timeout), ip],
capture_output=True,
timeout=self.timeout + 1
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def test_ssh(self, hostname: str, ip: str, user: str) -> Tuple[bool, str]:
"""Test SSH connectivity and return (success, error_message)."""
try:
result = subprocess.run(
['ssh', '-o', 'ConnectTimeout=3', '-o', 'BatchMode=yes',
f'{user}@{ip}', 'exit'],
capture_output=True,
timeout=5
)
if result.returncode == 0:
return True, ""
else:
error = result.stderr.decode().strip()
return False, error
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
return False, str(e)
def parse_hosts_file(self) -> List[Dict]:
"""Parse hosts file and return structured host data."""
hosts = []
current_group = None
with open(self.hosts_file, 'r') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
# Group header
if line.startswith('[') and line.endswith(']'):
current_group = line[1:-1]
continue
# Host entry
if current_group and 'ansible_host=' in line:
host_data = self._parse_host_line(line, current_group)
if host_data:
hosts.append(host_data)
return hosts
def _parse_host_line(self, line: str, group: str) -> Optional[Dict]:
"""Parse a single host line and return host data."""
parts = line.split()
if not parts:
return None
hostname = parts[0]
attrs = {}
for part in parts[1:]:
if '=' in part:
key, value = part.split('=', 1)
attrs[key] = value
return {
'hostname': hostname,
'group': group,
'primary_ip': attrs.get('ansible_host', ''),
'fallback_ip': attrs.get('ansible_host_fallback', ''),
'user': attrs.get('ansible_user', 'root'),
'original_line': line
}
def test_host(self, host_data: Dict) -> Dict:
"""Test connectivity for a single host."""
hostname = host_data['hostname']
primary_ip = host_data['primary_ip']
fallback_ip = host_data['fallback_ip']
user = host_data['user']
result = {
'hostname': hostname,
'group': host_data['group'],
'primary_ip': primary_ip,
'fallback_ip': fallback_ip,
'user': user,
'primary_ping': False,
'primary_ssh': False,
'fallback_ping': False,
'fallback_ssh': False,
'primary_ssh_error': '',
'fallback_ssh_error': '',
'recommendation': '',
'status': 'unknown'
}
# Test primary IP
if primary_ip:
result['primary_ping'] = self.test_ping(primary_ip)
if result['primary_ping']:
ssh_success, ssh_error = self.test_ssh(hostname, primary_ip, user)
result['primary_ssh'] = ssh_success
result['primary_ssh_error'] = ssh_error
# Test fallback IP
if fallback_ip:
result['fallback_ping'] = self.test_ping(fallback_ip)
if result['fallback_ping']:
ssh_success, ssh_error = self.test_ssh(hostname, fallback_ip, user)
result['fallback_ssh'] = ssh_success
result['fallback_ssh_error'] = ssh_error
# Determine status and recommendation
result['status'], result['recommendation'] = self._analyze_connectivity(result)
return result
def _analyze_connectivity(self, result: Dict) -> Tuple[str, str]:
"""Analyze connectivity results and provide recommendations."""
hostname = result['hostname']
primary_ip = result['primary_ip']
fallback_ip = result['fallback_ip']
# Primary IP works perfectly
if result['primary_ping'] and result['primary_ssh']:
return 'success', f"{hostname} is fully accessible via primary IP {primary_ip}"
# Primary ping works but SSH fails
if result['primary_ping'] and not result['primary_ssh']:
error = result['primary_ssh_error']
if 'Permission denied' in error:
return 'ssh_key', f"{hostname}: SSH key issue on {primary_ip} - run: make copy-ssh-key HOST={hostname}"
elif 'Connection refused' in error:
return 'ssh_service', f"{hostname}: SSH service not running on {primary_ip}"
else:
return 'ssh_error', f"{hostname}: SSH error on {primary_ip} - {error}"
# Primary IP fails, test fallback
if not result['primary_ping'] and fallback_ip:
if result['fallback_ping'] and result['fallback_ssh']:
return 'use_fallback', f"{hostname}: Switch to fallback IP {fallback_ip} (primary {primary_ip} failed)"
elif result['fallback_ping'] and not result['fallback_ssh']:
return 'fallback_ssh', f"{hostname}: Fallback IP {fallback_ip} reachable but SSH failed"
else:
return 'both_failed', f"{hostname}: Both primary {primary_ip} and fallback {fallback_ip} failed"
# No fallback IP and primary failed
if not result['primary_ping'] and not fallback_ip:
return 'no_fallback', f"{hostname}: Primary IP {primary_ip} failed, no fallback available"
return 'unknown', f"? {hostname}: Unknown connectivity state"
def run_tests(self) -> List[Dict]:
"""Run connectivity tests for all hosts."""
hosts = self.parse_hosts_file()
results = []
print("🔍 Testing host connectivity...")
print("=" * 60)
for host_data in hosts:
print(f"Testing {host_data['hostname']}...", end=' ')
result = self.test_host(host_data)
results.append(result)
# Print immediate status
if result['status'] == 'success':
print("")
elif result['status'] in ['ssh_key', 'ssh_service', 'ssh_error']:
print("⚠️")
elif result['status'] == 'use_fallback':
print("🔄")
else:
print("")
return results
def print_summary(self, results: List[Dict]):
"""Print detailed summary of connectivity test results."""
print("\n📊 CONNECTIVITY SUMMARY")
print("=" * 60)
# Group results by status
by_status = {}
for result in results:
status = result['status']
if status not in by_status:
by_status[status] = []
by_status[status].append(result)
# Print each status group
status_icons = {
'success': '',
'ssh_key': '🔑',
'ssh_service': '🔧',
'ssh_error': '⚠️',
'use_fallback': '🔄',
'both_failed': '',
'no_fallback': '🚫',
'unknown': ''
}
for status, hosts in by_status.items():
icon = status_icons.get(status, '')
print(f"\n{icon} {status.upper().replace('_', ' ')} ({len(hosts)} hosts)")
print("-" * 40)
for result in hosts:
print(f" {result['hostname']:<20} {result['recommendation']}")
# Print actionable recommendations
self._print_recommendations(results)
def _print_recommendations(self, results: List[Dict]):
"""Print actionable recommendations based on test results."""
print("\n🛠️ RECOMMENDATIONS")
print("=" * 60)
# SSH key issues
ssh_key_issues = [r for r in results if r['status'] == 'ssh_key']
if ssh_key_issues:
print(f"\n🔑 Fix SSH key issues ({len(ssh_key_issues)} hosts):")
for result in ssh_key_issues:
print(f" make copy-ssh-key HOST={result['hostname']}")
# Fallback recommendations
fallback_needed = [r for r in results if r['status'] == 'use_fallback']
if fallback_needed:
print(f"\n🔄 Switch to fallback IPs ({len(fallback_needed)} hosts):")
for result in fallback_needed:
print(f" sed -i 's/{result['hostname']} ansible_host={result['primary_ip']}/{result['hostname']} ansible_host={result['fallback_ip']}/' {self.hosts_file}")
# Critical issues
critical_issues = [r for r in results if r['status'] in ['both_failed', 'no_fallback']]
if critical_issues:
print(f"\n🚨 Critical issues ({len(critical_issues)} hosts):")
for result in critical_issues:
print(f" {result['hostname']}: {result['recommendation']}")
# Auto-fallback suggestion
if fallback_needed:
print(f"\n🤖 Or run auto-fallback to fix automatically:")
print(f" make auto-fallback")
def export_json(self, results: List[Dict], output_file: str):
"""Export results to JSON file."""
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
print(f"\n📄 Results exported to: {output_file}")
def main():
parser = argparse.ArgumentParser(description='Advanced connectivity test for Ansible hosts')
parser.add_argument('--hosts-file', default='inventories/production/hosts',
help='Path to hosts file')
parser.add_argument('--timeout', type=int, default=3,
help='Ping timeout in seconds')
parser.add_argument('--json', help='Export results to JSON file')
parser.add_argument('--quiet', action='store_true',
help='Only show summary, not individual tests')
args = parser.parse_args()
if not Path(args.hosts_file).exists():
print(f"❌ Error: Hosts file not found: {args.hosts_file}")
sys.exit(1)
tester = ConnectivityTester(args.hosts_file, args.timeout)
if args.quiet:
# Suppress individual test output
import os
with open(os.devnull, 'w') as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
results = tester.run_tests()
sys.stdout = old_stdout
else:
results = tester.run_tests()
tester.print_summary(results)
if args.json:
tester.export_json(results, args.json)
if __name__ == '__main__':
main()