POTE/scripts/generate_pattern_report.py
ilia 0d8d85adc1 Add complete automation, reporting, and CI/CD system
Features Added:
==============

📧 EMAIL REPORTING SYSTEM:
- EmailReporter: Send reports via SMTP (Gmail, SendGrid, custom)
- ReportGenerator: Generate daily/weekly summaries with HTML/text formatting
- Configurable via .env (SMTP_HOST, SMTP_PORT, etc.)
- Scripts: send_daily_report.py, send_weekly_report.py

🤖 AUTOMATED RUNS:
- automated_daily_run.sh: Full daily ETL pipeline + reporting
- automated_weekly_run.sh: Weekly pattern analysis + reports
- setup_cron.sh: Interactive cron job setup (5-minute setup)
- Logs saved to ~/logs/ with automatic cleanup

🔍 HEALTH CHECKS:
- health_check.py: System health monitoring
- Checks: DB connection, data freshness, counts, recent alerts
- JSON output for programmatic use
- Exit codes for monitoring integration

🚀 CI/CD PIPELINE:
- .github/workflows/ci.yml: Full CI/CD pipeline
- GitHub Actions / Gitea Actions compatible
- Jobs: lint & test, security scan, dependency scan, Docker build
- PostgreSQL service for integration tests
- 93 tests passing in CI

📚 COMPREHENSIVE DOCUMENTATION:
- AUTOMATION_QUICKSTART.md: 5-minute email setup guide
- docs/12_automation_and_reporting.md: Full automation guide
- Updated README.md with automation links
- Deployment → Production workflow guide

🛠️ IMPROVEMENTS:
- All shell scripts made executable
- Environment variable examples in .env.example
- Report logs saved with timestamps
- 30-day log retention with auto-cleanup
- Health checks can be scheduled via cron

WHAT THIS ENABLES:
==================
After deployment, users can:
1. Set up automated daily/weekly email reports (5 min)
2. Receive HTML+text emails with:
   - New trades, market alerts, suspicious timing
   - Weekly patterns, rankings, repeat offenders
3. Monitor system health automatically
4. Run full CI/CD pipeline on every commit
5. Deploy with confidence (tests + security scans)

USAGE:
======
# One-time setup (on deployed server)
./scripts/setup_cron.sh

# Or manually send reports
python scripts/send_daily_report.py --to user@example.com
python scripts/send_weekly_report.py --to user@example.com

# Check system health
python scripts/health_check.py

See AUTOMATION_QUICKSTART.md for full instructions.

93 tests passing | Full CI/CD | Email reports ready
2025-12-15 15:34:31 -05:00

235 lines
7.4 KiB
Python
Executable File

#!/usr/bin/env python
"""
Generate comprehensive pattern analysis report.
Identifies repeat offenders and systematic suspicious behavior.
"""
import click
from pathlib import Path
from tabulate import tabulate
from pote.db import get_session
from pote.monitoring.pattern_detector import PatternDetector
@click.command()
@click.option("--days", default=365, help="Analyze last N days (default: 365)")
@click.option("--output", help="Save report to file")
@click.option("--format", type=click.Choice(["text", "json"]), default="text")
def main(days, output, format):
"""Generate comprehensive pattern analysis report."""
session = next(get_session())
detector = PatternDetector(session)
click.echo(f"\n🔍 Generating pattern analysis for last {days} days...\n")
report_data = detector.generate_pattern_report(lookback_days=days)
if format == "json":
import json
report = json.dumps(report_data, indent=2, default=str)
else:
report = format_pattern_report(report_data)
click.echo(report)
if output:
Path(output).write_text(report)
click.echo(f"\n💾 Report saved to {output}")
def format_pattern_report(data):
"""Format pattern data as text report."""
lines = [
"=" * 100,
" CONGRESSIONAL TRADING PATTERN ANALYSIS",
f" Period: {data['period_days']} days",
"=" * 100,
"",
"📊 SUMMARY",
"" * 100,
f"Officials Analyzed: {data['summary']['total_officials_analyzed']}",
f"Repeat Offenders: {data['summary']['repeat_offenders']}",
f"Average Timing Score: {data['summary']['avg_timing_score']}/100",
"",
]
# Top Suspicious Officials
if data['top_suspicious_officials']:
lines.extend([
"",
"🚨 TOP 10 MOST SUSPICIOUS OFFICIALS (By Timing Score)",
"=" * 100,
"",
])
table_data = []
for i, official in enumerate(data['top_suspicious_officials'][:10], 1):
# Determine emoji based on severity
if official['avg_timing_score'] >= 70:
emoji = "🚨"
elif official['avg_timing_score'] >= 50:
emoji = "🔴"
else:
emoji = "🟡"
table_data.append([
f"{emoji} {i}",
official['name'],
f"{official['party'][:1]}-{official['state']}",
official['chamber'],
official['trade_count'],
f"{official['suspicious_trades']}/{official['trade_count']}",
f"{official['suspicious_rate']}%",
f"{official['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Rank", "Official", "Party-State", "Chamber", "Trades", "Suspicious", "Rate", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Repeat Offenders
if data['repeat_offenders']:
lines.extend([
"",
"🔥 REPEAT OFFENDERS (50%+ Suspicious Trades)",
"=" * 100,
"",
])
for offender in data['repeat_offenders']:
lines.extend([
f"🚨 {offender['name']} ({offender['party'][:1]}-{offender['state']}, {offender['chamber']})",
f" Trades: {offender['trade_count']} | Suspicious: {offender['suspicious_trades']} ({offender['suspicious_rate']}%)",
f" Avg Timing Score: {offender['avg_timing_score']}/100",
f" Pattern: {offender['pattern']}",
"",
])
# Suspicious Tickers
if data['suspicious_tickers']:
lines.extend([
"",
"📈 MOST SUSPICIOUSLY TRADED STOCKS",
"=" * 100,
"",
])
table_data = []
for ticker_data in data['suspicious_tickers'][:10]:
table_data.append([
ticker_data['ticker'],
ticker_data['trade_count'],
f"{ticker_data['trades_with_alerts']}/{ticker_data['trade_count']}",
f"{ticker_data['suspicious_count']}/{ticker_data['trade_count']}",
f"{ticker_data['suspicious_rate']}%",
f"{ticker_data['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Ticker", "Total Trades", "With Alerts", "Suspicious", "Susp. Rate", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Sector Analysis
if data['sector_analysis']:
lines.extend([
"",
"🏭 SECTOR ANALYSIS",
"=" * 100,
"",
])
# Sort sectors by suspicious rate
sectors = sorted(
data['sector_analysis'].items(),
key=lambda x: x[1].get('suspicious_rate', 0),
reverse=True
)
table_data = []
for sector, stats in sectors[:10]:
table_data.append([
sector,
stats['trade_count'],
f"{stats['trades_with_alerts']}/{stats['trade_count']}",
f"{stats['alert_rate']}%",
f"{stats['suspicious_count']}/{stats['trade_count']}",
f"{stats['suspicious_rate']}%",
f"{stats['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Sector", "Trades", "W/ Alerts", "Alert %", "Suspicious", "Susp %", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Party Comparison
if data['party_comparison']:
lines.extend([
"",
"🏛️ PARTY COMPARISON",
"=" * 100,
"",
])
table_data = []
for party, stats in sorted(data['party_comparison'].items()):
table_data.append([
party,
stats['official_count'],
stats['total_trades'],
f"{stats['total_suspicious']}/{stats['total_trades']}",
f"{stats['suspicious_rate']}%",
f"{stats['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Party", "Officials", "Total Trades", "Suspicious", "Susp. Rate", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Footer
lines.extend([
"",
"=" * 100,
"📋 INTERPRETATION GUIDE",
"=" * 100,
"",
"Timing Score Ranges:",
" 🚨 80-100: Highly suspicious - Strong evidence of timing advantage",
" 🔴 60-79: Suspicious - Likely timing advantage",
" 🟡 40-59: Notable - Some unusual activity",
" ✅ 0-39: Normal - No significant pattern",
"",
"Suspicious Rate:",
" 50%+ = Repeat offender pattern",
" 25-50% = Concerning frequency",
" <25% = Within normal range",
"",
"⚠️ DISCLAIMER:",
" This analysis is for research and transparency purposes only.",
" High scores indicate statistical anomalies requiring further investigation.",
" This is not legal proof of wrongdoing.",
"",
"=" * 100,
])
return "\n".join(lines)
if __name__ == "__main__":
main()