ansible/test_connectivity.py
ilia d6655babd9
All checks were successful
CI / lint-and-test (pull_request) Successful in 1m0s
CI / ansible-validation (pull_request) Successful in 2m12s
CI / secret-scanning (pull_request) Successful in 54s
CI / dependency-scan (pull_request) Successful in 58s
CI / sast-scan (pull_request) Successful in 2m58s
CI / license-check (pull_request) Successful in 59s
CI / vault-check (pull_request) Successful in 2m50s
CI / playbook-test (pull_request) Successful in 2m42s
CI / container-scan (pull_request) Successful in 1m44s
CI / sonar-analysis (pull_request) Successful in 2m12s
CI / workflow-summary (pull_request) Successful in 51s
Refactor: Simplify connectivity analysis logic by breaking down into smaller helper functions for improved readability and maintainability
2025-12-15 14:55:10 -05:00

364 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Advanced connectivity test for Ansible hosts with fallback IP support.
Tests both primary and fallback IPs, provides detailed diagnostics, and suggests fixes.
"""
import subprocess
import sys
import argparse
import json
import re
from pathlib import Path
from typing import Dict, List, Tuple, Optional
class ConnectivityTester:
def __init__(self, hosts_file: str, timeout: int = 3):
self.hosts_file = Path(hosts_file)
self.timeout = timeout
self.results = {}
def test_ping(self, ip: str) -> bool:
"""Test if host is reachable via ping."""
try:
result = subprocess.run(
['ping', '-c', '1', '-W', str(self.timeout), ip],
capture_output=True,
timeout=self.timeout + 1
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def test_ssh(self, hostname: str, ip: str, user: str) -> Tuple[bool, str]:
"""Test SSH connectivity and return (success, error_message)."""
try:
result = subprocess.run(
['ssh', '-o', 'ConnectTimeout=3', '-o', 'BatchMode=yes',
f'{user}@{ip}', 'exit'],
capture_output=True,
timeout=5
)
if result.returncode == 0:
return True, ""
else:
error = result.stderr.decode().strip()
return False, error
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
return False, str(e)
def parse_hosts_file(self) -> List[Dict]:
"""Parse hosts file and return structured host data."""
hosts = []
current_group = None
with open(self.hosts_file, 'r') as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
# Group header
if line.startswith('[') and line.endswith(']'):
current_group = line[1:-1]
continue
# Host entry
if current_group and 'ansible_host=' in line:
host_data = self._parse_host_line(line, current_group)
if host_data:
hosts.append(host_data)
return hosts
def _parse_host_line(self, line: str, group: str) -> Optional[Dict]:
"""Parse a single host line and return host data."""
parts = line.split()
if not parts:
return None
hostname = parts[0]
attrs = {}
for part in parts[1:]:
if '=' in part:
key, value = part.split('=', 1)
attrs[key] = value
return {
'hostname': hostname,
'group': group,
'primary_ip': attrs.get('ansible_host', ''),
'fallback_ip': attrs.get('ansible_host_fallback', ''),
'user': attrs.get('ansible_user', 'root'),
'original_line': line
}
def test_host(self, host_data: Dict) -> Dict:
"""Test connectivity for a single host."""
hostname = host_data['hostname']
primary_ip = host_data['primary_ip']
fallback_ip = host_data['fallback_ip']
user = host_data['user']
result = {
'hostname': hostname,
'group': host_data['group'],
'primary_ip': primary_ip,
'fallback_ip': fallback_ip,
'user': user,
'primary_ping': False,
'primary_ssh': False,
'fallback_ping': False,
'fallback_ssh': False,
'primary_ssh_error': '',
'fallback_ssh_error': '',
'recommendation': '',
'status': 'unknown'
}
# Test primary IP
if primary_ip:
result['primary_ping'] = self.test_ping(primary_ip)
if result['primary_ping']:
ssh_success, ssh_error = self.test_ssh(hostname, primary_ip, user)
result['primary_ssh'] = ssh_success
result['primary_ssh_error'] = ssh_error
# Test fallback IP
if fallback_ip:
result['fallback_ping'] = self.test_ping(fallback_ip)
if result['fallback_ping']:
ssh_success, ssh_error = self.test_ssh(hostname, fallback_ip, user)
result['fallback_ssh'] = ssh_success
result['fallback_ssh_error'] = ssh_error
# Determine status and recommendation
result['status'], result['recommendation'] = self._analyze_connectivity(result)
return result
def _analyze_connectivity(self, result: Dict) -> Tuple[str, str]:
"""Analyze connectivity results and provide recommendations.
Split into smaller helpers to keep this function's complexity low
while preserving the original decision logic.
"""
for handler in (
self._handle_primary_success,
self._handle_primary_ping_only,
self._handle_fallback_path,
self._handle_no_fallback,
):
outcome = handler(result)
if outcome is not None:
return outcome
hostname = result["hostname"]
return "unknown", f"? {hostname}: Unknown connectivity state"
def _handle_primary_success(self, result: Dict) -> Optional[Tuple[str, str]]:
"""Handle case where primary IP works perfectly."""
if result.get("primary_ping") and result.get("primary_ssh"):
hostname = result["hostname"]
primary_ip = result["primary_ip"]
return "success", f"{hostname} is fully accessible via primary IP {primary_ip}"
return None
def _handle_primary_ping_only(self, result: Dict) -> Optional[Tuple[str, str]]:
"""Handle cases where primary ping works but SSH fails."""
if result.get("primary_ping") and not result.get("primary_ssh"):
hostname = result["hostname"]
primary_ip = result["primary_ip"]
error = result.get("primary_ssh_error", "")
if "Permission denied" in error:
return (
"ssh_key",
f"{hostname}: SSH key issue on {primary_ip} - run: make copy-ssh-key HOST={hostname}",
)
if "Connection refused" in error:
return "ssh_service", f"{hostname}: SSH service not running on {primary_ip}"
return "ssh_error", f"{hostname}: SSH error on {primary_ip} - {error}"
return None
def _handle_fallback_path(self, result: Dict) -> Optional[Tuple[str, str]]:
"""Handle cases where primary fails and a fallback IP is defined."""
if result.get("primary_ping"):
return None
fallback_ip = result.get("fallback_ip")
if not fallback_ip:
return None
hostname = result["hostname"]
primary_ip = result["primary_ip"]
if result.get("fallback_ping") and result.get("fallback_ssh"):
return (
"use_fallback",
f"{hostname}: Switch to fallback IP {fallback_ip} (primary {primary_ip} failed)",
)
if result.get("fallback_ping") and not result.get("fallback_ssh"):
return (
"fallback_ssh",
f"{hostname}: Fallback IP {fallback_ip} reachable but SSH failed",
)
return (
"both_failed",
f"{hostname}: Both primary {primary_ip} and fallback {fallback_ip} failed",
)
def _handle_no_fallback(self, result: Dict) -> Optional[Tuple[str, str]]:
"""Handle cases where primary failed and no fallback IP is available."""
if result.get("primary_ping"):
return None
fallback_ip = result.get("fallback_ip")
if fallback_ip:
return None
hostname = result["hostname"]
primary_ip = result["primary_ip"]
return "no_fallback", f"{hostname}: Primary IP {primary_ip} failed, no fallback available"
def run_tests(self) -> List[Dict]:
"""Run connectivity tests for all hosts."""
hosts = self.parse_hosts_file()
results = []
print("🔍 Testing host connectivity...")
print("=" * 60)
for host_data in hosts:
print(f"Testing {host_data['hostname']}...", end=' ')
result = self.test_host(host_data)
results.append(result)
# Print immediate status
if result['status'] == 'success':
print("")
elif result['status'] in ['ssh_key', 'ssh_service', 'ssh_error']:
print("⚠️")
elif result['status'] == 'use_fallback':
print("🔄")
else:
print("")
return results
def print_summary(self, results: List[Dict]):
"""Print detailed summary of connectivity test results."""
print("\n📊 CONNECTIVITY SUMMARY")
print("=" * 60)
# Group results by status
by_status = {}
for result in results:
status = result['status']
if status not in by_status:
by_status[status] = []
by_status[status].append(result)
# Print each status group
status_icons = {
'success': '',
'ssh_key': '🔑',
'ssh_service': '🔧',
'ssh_error': '⚠️',
'use_fallback': '🔄',
'both_failed': '',
'no_fallback': '🚫',
'unknown': ''
}
for status, hosts in by_status.items():
icon = status_icons.get(status, '')
print(f"\n{icon} {status.upper().replace('_', ' ')} ({len(hosts)} hosts)")
print("-" * 40)
for result in hosts:
print(f" {result['hostname']:<20} {result['recommendation']}")
# Print actionable recommendations
self._print_recommendations(results)
def _print_recommendations(self, results: List[Dict]):
"""Print actionable recommendations based on test results."""
print("\n🛠️ RECOMMENDATIONS")
print("=" * 60)
# SSH key issues
ssh_key_issues = [r for r in results if r['status'] == 'ssh_key']
if ssh_key_issues:
print(f"\n🔑 Fix SSH key issues ({len(ssh_key_issues)} hosts):")
for result in ssh_key_issues:
print(f" make copy-ssh-key HOST={result['hostname']}")
# Fallback recommendations
fallback_needed = [r for r in results if r['status'] == 'use_fallback']
if fallback_needed:
print(f"\n🔄 Switch to fallback IPs ({len(fallback_needed)} hosts):")
for result in fallback_needed:
print(f" sed -i 's/{result['hostname']} ansible_host={result['primary_ip']}/{result['hostname']} ansible_host={result['fallback_ip']}/' {self.hosts_file}")
# Critical issues
critical_issues = [r for r in results if r['status'] in ['both_failed', 'no_fallback']]
if critical_issues:
print(f"\n🚨 Critical issues ({len(critical_issues)} hosts):")
for result in critical_issues:
print(f" {result['hostname']}: {result['recommendation']}")
# Auto-fallback suggestion
if fallback_needed:
print("\n🤖 Or run auto-fallback to fix automatically:")
print(" make auto-fallback")
def export_json(self, results: List[Dict], output_file: str):
"""Export results to JSON file."""
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
print(f"\n📄 Results exported to: {output_file}")
def main():
parser = argparse.ArgumentParser(description='Advanced connectivity test for Ansible hosts')
parser.add_argument('--hosts-file', default='inventories/production/hosts',
help='Path to hosts file')
parser.add_argument('--timeout', type=int, default=3,
help='Ping timeout in seconds')
parser.add_argument('--json', help='Export results to JSON file')
parser.add_argument('--quiet', action='store_true',
help='Only show summary, not individual tests')
args = parser.parse_args()
if not Path(args.hosts_file).exists():
print(f"❌ Error: Hosts file not found: {args.hosts_file}")
sys.exit(1)
tester = ConnectivityTester(args.hosts_file, args.timeout)
if args.quiet:
# Suppress individual test output
import os
with open(os.devnull, 'w') as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
results = tester.run_tests()
sys.stdout = old_stdout
else:
results = tester.run_tests()
tester.print_summary(results)
if args.json:
tester.export_json(results, args.json)
if __name__ == '__main__':
main()