All checks were successful
CI / lint-and-test (pull_request) Successful in 1m0s
CI / ansible-validation (pull_request) Successful in 2m12s
CI / secret-scanning (pull_request) Successful in 54s
CI / dependency-scan (pull_request) Successful in 58s
CI / sast-scan (pull_request) Successful in 2m58s
CI / license-check (pull_request) Successful in 59s
CI / vault-check (pull_request) Successful in 2m50s
CI / playbook-test (pull_request) Successful in 2m42s
CI / container-scan (pull_request) Successful in 1m44s
CI / sonar-analysis (pull_request) Successful in 2m12s
CI / workflow-summary (pull_request) Successful in 51s
364 lines
13 KiB
Python
364 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Advanced connectivity test for Ansible hosts with fallback IP support.
|
|
Tests both primary and fallback IPs, provides detailed diagnostics, and suggests fixes.
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
import argparse
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
|
class ConnectivityTester:
|
|
def __init__(self, hosts_file: str, timeout: int = 3):
|
|
self.hosts_file = Path(hosts_file)
|
|
self.timeout = timeout
|
|
self.results = {}
|
|
|
|
def test_ping(self, ip: str) -> bool:
|
|
"""Test if host is reachable via ping."""
|
|
try:
|
|
result = subprocess.run(
|
|
['ping', '-c', '1', '-W', str(self.timeout), ip],
|
|
capture_output=True,
|
|
timeout=self.timeout + 1
|
|
)
|
|
return result.returncode == 0
|
|
except (subprocess.TimeoutExpired, FileNotFoundError):
|
|
return False
|
|
|
|
def test_ssh(self, hostname: str, ip: str, user: str) -> Tuple[bool, str]:
|
|
"""Test SSH connectivity and return (success, error_message)."""
|
|
try:
|
|
result = subprocess.run(
|
|
['ssh', '-o', 'ConnectTimeout=3', '-o', 'BatchMode=yes',
|
|
f'{user}@{ip}', 'exit'],
|
|
capture_output=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
return True, ""
|
|
else:
|
|
error = result.stderr.decode().strip()
|
|
return False, error
|
|
except (subprocess.TimeoutExpired, FileNotFoundError) as e:
|
|
return False, str(e)
|
|
|
|
def parse_hosts_file(self) -> List[Dict]:
|
|
"""Parse hosts file and return structured host data."""
|
|
hosts = []
|
|
current_group = None
|
|
|
|
with open(self.hosts_file, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
# Group header
|
|
if line.startswith('[') and line.endswith(']'):
|
|
current_group = line[1:-1]
|
|
continue
|
|
|
|
# Host entry
|
|
if current_group and 'ansible_host=' in line:
|
|
host_data = self._parse_host_line(line, current_group)
|
|
if host_data:
|
|
hosts.append(host_data)
|
|
|
|
return hosts
|
|
|
|
def _parse_host_line(self, line: str, group: str) -> Optional[Dict]:
|
|
"""Parse a single host line and return host data."""
|
|
parts = line.split()
|
|
if not parts:
|
|
return None
|
|
|
|
hostname = parts[0]
|
|
attrs = {}
|
|
|
|
for part in parts[1:]:
|
|
if '=' in part:
|
|
key, value = part.split('=', 1)
|
|
attrs[key] = value
|
|
|
|
return {
|
|
'hostname': hostname,
|
|
'group': group,
|
|
'primary_ip': attrs.get('ansible_host', ''),
|
|
'fallback_ip': attrs.get('ansible_host_fallback', ''),
|
|
'user': attrs.get('ansible_user', 'root'),
|
|
'original_line': line
|
|
}
|
|
|
|
def test_host(self, host_data: Dict) -> Dict:
|
|
"""Test connectivity for a single host."""
|
|
hostname = host_data['hostname']
|
|
primary_ip = host_data['primary_ip']
|
|
fallback_ip = host_data['fallback_ip']
|
|
user = host_data['user']
|
|
|
|
result = {
|
|
'hostname': hostname,
|
|
'group': host_data['group'],
|
|
'primary_ip': primary_ip,
|
|
'fallback_ip': fallback_ip,
|
|
'user': user,
|
|
'primary_ping': False,
|
|
'primary_ssh': False,
|
|
'fallback_ping': False,
|
|
'fallback_ssh': False,
|
|
'primary_ssh_error': '',
|
|
'fallback_ssh_error': '',
|
|
'recommendation': '',
|
|
'status': 'unknown'
|
|
}
|
|
|
|
# Test primary IP
|
|
if primary_ip:
|
|
result['primary_ping'] = self.test_ping(primary_ip)
|
|
if result['primary_ping']:
|
|
ssh_success, ssh_error = self.test_ssh(hostname, primary_ip, user)
|
|
result['primary_ssh'] = ssh_success
|
|
result['primary_ssh_error'] = ssh_error
|
|
|
|
# Test fallback IP
|
|
if fallback_ip:
|
|
result['fallback_ping'] = self.test_ping(fallback_ip)
|
|
if result['fallback_ping']:
|
|
ssh_success, ssh_error = self.test_ssh(hostname, fallback_ip, user)
|
|
result['fallback_ssh'] = ssh_success
|
|
result['fallback_ssh_error'] = ssh_error
|
|
|
|
# Determine status and recommendation
|
|
result['status'], result['recommendation'] = self._analyze_connectivity(result)
|
|
|
|
return result
|
|
|
|
def _analyze_connectivity(self, result: Dict) -> Tuple[str, str]:
|
|
"""Analyze connectivity results and provide recommendations.
|
|
|
|
Split into smaller helpers to keep this function's complexity low
|
|
while preserving the original decision logic.
|
|
"""
|
|
for handler in (
|
|
self._handle_primary_success,
|
|
self._handle_primary_ping_only,
|
|
self._handle_fallback_path,
|
|
self._handle_no_fallback,
|
|
):
|
|
outcome = handler(result)
|
|
if outcome is not None:
|
|
return outcome
|
|
|
|
hostname = result["hostname"]
|
|
return "unknown", f"? {hostname}: Unknown connectivity state"
|
|
|
|
def _handle_primary_success(self, result: Dict) -> Optional[Tuple[str, str]]:
|
|
"""Handle case where primary IP works perfectly."""
|
|
if result.get("primary_ping") and result.get("primary_ssh"):
|
|
hostname = result["hostname"]
|
|
primary_ip = result["primary_ip"]
|
|
return "success", f"✓ {hostname} is fully accessible via primary IP {primary_ip}"
|
|
return None
|
|
|
|
def _handle_primary_ping_only(self, result: Dict) -> Optional[Tuple[str, str]]:
|
|
"""Handle cases where primary ping works but SSH fails."""
|
|
if result.get("primary_ping") and not result.get("primary_ssh"):
|
|
hostname = result["hostname"]
|
|
primary_ip = result["primary_ip"]
|
|
error = result.get("primary_ssh_error", "")
|
|
|
|
if "Permission denied" in error:
|
|
return (
|
|
"ssh_key",
|
|
f"⚠ {hostname}: SSH key issue on {primary_ip} - run: make copy-ssh-key HOST={hostname}",
|
|
)
|
|
if "Connection refused" in error:
|
|
return "ssh_service", f"⚠ {hostname}: SSH service not running on {primary_ip}"
|
|
return "ssh_error", f"⚠ {hostname}: SSH error on {primary_ip} - {error}"
|
|
|
|
return None
|
|
|
|
def _handle_fallback_path(self, result: Dict) -> Optional[Tuple[str, str]]:
|
|
"""Handle cases where primary fails and a fallback IP is defined."""
|
|
if result.get("primary_ping"):
|
|
return None
|
|
|
|
fallback_ip = result.get("fallback_ip")
|
|
if not fallback_ip:
|
|
return None
|
|
|
|
hostname = result["hostname"]
|
|
primary_ip = result["primary_ip"]
|
|
|
|
if result.get("fallback_ping") and result.get("fallback_ssh"):
|
|
return (
|
|
"use_fallback",
|
|
f"→ {hostname}: Switch to fallback IP {fallback_ip} (primary {primary_ip} failed)",
|
|
)
|
|
|
|
if result.get("fallback_ping") and not result.get("fallback_ssh"):
|
|
return (
|
|
"fallback_ssh",
|
|
f"⚠ {hostname}: Fallback IP {fallback_ip} reachable but SSH failed",
|
|
)
|
|
|
|
return (
|
|
"both_failed",
|
|
f"✗ {hostname}: Both primary {primary_ip} and fallback {fallback_ip} failed",
|
|
)
|
|
|
|
def _handle_no_fallback(self, result: Dict) -> Optional[Tuple[str, str]]:
|
|
"""Handle cases where primary failed and no fallback IP is available."""
|
|
if result.get("primary_ping"):
|
|
return None
|
|
|
|
fallback_ip = result.get("fallback_ip")
|
|
if fallback_ip:
|
|
return None
|
|
|
|
hostname = result["hostname"]
|
|
primary_ip = result["primary_ip"]
|
|
return "no_fallback", f"✗ {hostname}: Primary IP {primary_ip} failed, no fallback available"
|
|
|
|
def run_tests(self) -> List[Dict]:
|
|
"""Run connectivity tests for all hosts."""
|
|
hosts = self.parse_hosts_file()
|
|
results = []
|
|
|
|
print("🔍 Testing host connectivity...")
|
|
print("=" * 60)
|
|
|
|
for host_data in hosts:
|
|
print(f"Testing {host_data['hostname']}...", end=' ')
|
|
result = self.test_host(host_data)
|
|
results.append(result)
|
|
|
|
# Print immediate status
|
|
if result['status'] == 'success':
|
|
print("✅")
|
|
elif result['status'] in ['ssh_key', 'ssh_service', 'ssh_error']:
|
|
print("⚠️")
|
|
elif result['status'] == 'use_fallback':
|
|
print("🔄")
|
|
else:
|
|
print("❌")
|
|
|
|
return results
|
|
|
|
def print_summary(self, results: List[Dict]):
|
|
"""Print detailed summary of connectivity test results."""
|
|
print("\n📊 CONNECTIVITY SUMMARY")
|
|
print("=" * 60)
|
|
|
|
# Group results by status
|
|
by_status = {}
|
|
for result in results:
|
|
status = result['status']
|
|
if status not in by_status:
|
|
by_status[status] = []
|
|
by_status[status].append(result)
|
|
|
|
# Print each status group
|
|
status_icons = {
|
|
'success': '✅',
|
|
'ssh_key': '🔑',
|
|
'ssh_service': '🔧',
|
|
'ssh_error': '⚠️',
|
|
'use_fallback': '🔄',
|
|
'both_failed': '❌',
|
|
'no_fallback': '🚫',
|
|
'unknown': '❓'
|
|
}
|
|
|
|
for status, hosts in by_status.items():
|
|
icon = status_icons.get(status, '❓')
|
|
print(f"\n{icon} {status.upper().replace('_', ' ')} ({len(hosts)} hosts)")
|
|
print("-" * 40)
|
|
|
|
for result in hosts:
|
|
print(f" {result['hostname']:<20} {result['recommendation']}")
|
|
|
|
# Print actionable recommendations
|
|
self._print_recommendations(results)
|
|
|
|
def _print_recommendations(self, results: List[Dict]):
|
|
"""Print actionable recommendations based on test results."""
|
|
print("\n🛠️ RECOMMENDATIONS")
|
|
print("=" * 60)
|
|
|
|
# SSH key issues
|
|
ssh_key_issues = [r for r in results if r['status'] == 'ssh_key']
|
|
if ssh_key_issues:
|
|
print(f"\n🔑 Fix SSH key issues ({len(ssh_key_issues)} hosts):")
|
|
for result in ssh_key_issues:
|
|
print(f" make copy-ssh-key HOST={result['hostname']}")
|
|
|
|
# Fallback recommendations
|
|
fallback_needed = [r for r in results if r['status'] == 'use_fallback']
|
|
if fallback_needed:
|
|
print(f"\n🔄 Switch to fallback IPs ({len(fallback_needed)} hosts):")
|
|
for result in fallback_needed:
|
|
print(f" sed -i 's/{result['hostname']} ansible_host={result['primary_ip']}/{result['hostname']} ansible_host={result['fallback_ip']}/' {self.hosts_file}")
|
|
|
|
# Critical issues
|
|
critical_issues = [r for r in results if r['status'] in ['both_failed', 'no_fallback']]
|
|
if critical_issues:
|
|
print(f"\n🚨 Critical issues ({len(critical_issues)} hosts):")
|
|
for result in critical_issues:
|
|
print(f" {result['hostname']}: {result['recommendation']}")
|
|
|
|
# Auto-fallback suggestion
|
|
if fallback_needed:
|
|
print("\n🤖 Or run auto-fallback to fix automatically:")
|
|
print(" make auto-fallback")
|
|
|
|
def export_json(self, results: List[Dict], output_file: str):
|
|
"""Export results to JSON file."""
|
|
with open(output_file, 'w') as f:
|
|
json.dump(results, f, indent=2)
|
|
print(f"\n📄 Results exported to: {output_file}")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Advanced connectivity test for Ansible hosts')
|
|
parser.add_argument('--hosts-file', default='inventories/production/hosts',
|
|
help='Path to hosts file')
|
|
parser.add_argument('--timeout', type=int, default=3,
|
|
help='Ping timeout in seconds')
|
|
parser.add_argument('--json', help='Export results to JSON file')
|
|
parser.add_argument('--quiet', action='store_true',
|
|
help='Only show summary, not individual tests')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not Path(args.hosts_file).exists():
|
|
print(f"❌ Error: Hosts file not found: {args.hosts_file}")
|
|
sys.exit(1)
|
|
|
|
tester = ConnectivityTester(args.hosts_file, args.timeout)
|
|
|
|
if args.quiet:
|
|
# Suppress individual test output
|
|
import os
|
|
with open(os.devnull, 'w') as devnull:
|
|
old_stdout = sys.stdout
|
|
sys.stdout = devnull
|
|
results = tester.run_tests()
|
|
sys.stdout = old_stdout
|
|
else:
|
|
results = tester.run_tests()
|
|
|
|
tester.print_summary(results)
|
|
|
|
if args.json:
|
|
tester.export_json(results, args.json)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|