diff --git a/scripts/generate_pattern_report.py b/scripts/generate_pattern_report.py new file mode 100755 index 0000000..a84b9af --- /dev/null +++ b/scripts/generate_pattern_report.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python +""" +Generate comprehensive pattern analysis report. +Identifies repeat offenders and systematic suspicious behavior. +""" + +import click +from pathlib import Path +from tabulate import tabulate + +from pote.db import get_session +from pote.monitoring.pattern_detector import PatternDetector + + +@click.command() +@click.option("--days", default=365, help="Analyze last N days (default: 365)") +@click.option("--output", help="Save report to file") +@click.option("--format", type=click.Choice(["text", "json"]), default="text") +def main(days, output, format): + """Generate comprehensive pattern analysis report.""" + + session = next(get_session()) + detector = PatternDetector(session) + + click.echo(f"\nšŸ” Generating pattern analysis for last {days} days...\n") + + report_data = detector.generate_pattern_report(lookback_days=days) + + if format == "json": + import json + report = json.dumps(report_data, indent=2, default=str) + else: + report = format_pattern_report(report_data) + + click.echo(report) + + if output: + Path(output).write_text(report) + click.echo(f"\nšŸ’¾ Report saved to {output}") + + +def format_pattern_report(data): + """Format pattern data as text report.""" + lines = [ + "=" * 100, + " CONGRESSIONAL TRADING PATTERN ANALYSIS", + f" Period: {data['period_days']} days", + "=" * 100, + "", + "šŸ“Š SUMMARY", + "─" * 100, + f"Officials Analyzed: {data['summary']['total_officials_analyzed']}", + f"Repeat Offenders: {data['summary']['repeat_offenders']}", + f"Average Timing Score: {data['summary']['avg_timing_score']}/100", + "", + ] + + # Top Suspicious Officials + if data['top_suspicious_officials']: + lines.extend([ + "", + "🚨 TOP 10 MOST SUSPICIOUS OFFICIALS (By Timing Score)", + "=" * 100, + "", + ]) + + table_data = [] + for i, official in enumerate(data['top_suspicious_officials'][:10], 1): + # Determine emoji based on severity + if official['avg_timing_score'] >= 70: + emoji = "🚨" + elif official['avg_timing_score'] >= 50: + emoji = "šŸ”“" + else: + emoji = "🟔" + + table_data.append([ + f"{emoji} {i}", + official['name'], + f"{official['party'][:1]}-{official['state']}", + official['chamber'], + official['trade_count'], + f"{official['suspicious_trades']}/{official['trade_count']}", + f"{official['suspicious_rate']}%", + f"{official['avg_timing_score']}/100", + ]) + + lines.append(tabulate( + table_data, + headers=["Rank", "Official", "Party-State", "Chamber", "Trades", "Suspicious", "Rate", "Avg Score"], + tablefmt="simple" + )) + lines.append("") + + # Repeat Offenders + if data['repeat_offenders']: + lines.extend([ + "", + "šŸ”„ REPEAT OFFENDERS (50%+ Suspicious Trades)", + "=" * 100, + "", + ]) + + for offender in data['repeat_offenders']: + lines.extend([ + f"🚨 {offender['name']} ({offender['party'][:1]}-{offender['state']}, {offender['chamber']})", + f" Trades: {offender['trade_count']} | Suspicious: {offender['suspicious_trades']} ({offender['suspicious_rate']}%)", + f" Avg Timing Score: {offender['avg_timing_score']}/100", + f" Pattern: {offender['pattern']}", + "", + ]) + + # Suspicious Tickers + if data['suspicious_tickers']: + lines.extend([ + "", + "šŸ“ˆ MOST SUSPICIOUSLY TRADED STOCKS", + "=" * 100, + "", + ]) + + table_data = [] + for ticker_data in data['suspicious_tickers'][:10]: + table_data.append([ + ticker_data['ticker'], + ticker_data['trade_count'], + f"{ticker_data['trades_with_alerts']}/{ticker_data['trade_count']}", + f"{ticker_data['suspicious_count']}/{ticker_data['trade_count']}", + f"{ticker_data['suspicious_rate']}%", + f"{ticker_data['avg_timing_score']}/100", + ]) + + lines.append(tabulate( + table_data, + headers=["Ticker", "Total Trades", "With Alerts", "Suspicious", "Susp. Rate", "Avg Score"], + tablefmt="simple" + )) + lines.append("") + + # Sector Analysis + if data['sector_analysis']: + lines.extend([ + "", + "šŸ­ SECTOR ANALYSIS", + "=" * 100, + "", + ]) + + # Sort sectors by suspicious rate + sectors = sorted( + data['sector_analysis'].items(), + key=lambda x: x[1].get('suspicious_rate', 0), + reverse=True + ) + + table_data = [] + for sector, stats in sectors[:10]: + table_data.append([ + sector, + stats['trade_count'], + f"{stats['trades_with_alerts']}/{stats['trade_count']}", + f"{stats['alert_rate']}%", + f"{stats['suspicious_count']}/{stats['trade_count']}", + f"{stats['suspicious_rate']}%", + f"{stats['avg_timing_score']}/100", + ]) + + lines.append(tabulate( + table_data, + headers=["Sector", "Trades", "W/ Alerts", "Alert %", "Suspicious", "Susp %", "Avg Score"], + tablefmt="simple" + )) + lines.append("") + + # Party Comparison + if data['party_comparison']: + lines.extend([ + "", + "šŸ›ļø PARTY COMPARISON", + "=" * 100, + "", + ]) + + table_data = [] + for party, stats in sorted(data['party_comparison'].items()): + table_data.append([ + party, + stats['official_count'], + stats['total_trades'], + f"{stats['total_suspicious']}/{stats['total_trades']}", + f"{stats['suspicious_rate']}%", + f"{stats['avg_timing_score']}/100", + ]) + + lines.append(tabulate( + table_data, + headers=["Party", "Officials", "Total Trades", "Suspicious", "Susp. Rate", "Avg Score"], + tablefmt="simple" + )) + lines.append("") + + # Footer + lines.extend([ + "", + "=" * 100, + "šŸ“‹ INTERPRETATION GUIDE", + "=" * 100, + "", + "Timing Score Ranges:", + " 🚨 80-100: Highly suspicious - Strong evidence of timing advantage", + " šŸ”“ 60-79: Suspicious - Likely timing advantage", + " 🟔 40-59: Notable - Some unusual activity", + " āœ… 0-39: Normal - No significant pattern", + "", + "Suspicious Rate:", + " 50%+ = Repeat offender pattern", + " 25-50% = Concerning frequency", + " <25% = Within normal range", + "", + "āš ļø DISCLAIMER:", + " This analysis is for research and transparency purposes only.", + " High scores indicate statistical anomalies requiring further investigation.", + " This is not legal proof of wrongdoing.", + "", + "=" * 100, + ]) + + return "\n".join(lines) + + +if __name__ == "__main__": + main() + diff --git a/src/pote/monitoring/__init__.py b/src/pote/monitoring/__init__.py index efa1e65..4c794a4 100644 --- a/src/pote/monitoring/__init__.py +++ b/src/pote/monitoring/__init__.py @@ -6,6 +6,7 @@ Real-time tracking of unusual market activity. from .alert_manager import AlertManager from .disclosure_correlator import DisclosureCorrelator from .market_monitor import MarketMonitor +from .pattern_detector import PatternDetector -__all__ = ["MarketMonitor", "AlertManager", "DisclosureCorrelator"] +__all__ = ["MarketMonitor", "AlertManager", "DisclosureCorrelator", "PatternDetector"] diff --git a/src/pote/monitoring/pattern_detector.py b/src/pote/monitoring/pattern_detector.py new file mode 100644 index 0000000..4e4462b --- /dev/null +++ b/src/pote/monitoring/pattern_detector.py @@ -0,0 +1,359 @@ +""" +Pattern detection across officials and stocks. +Identifies recurring suspicious behavior and trading patterns. +""" + +import logging +from datetime import date, timedelta +from decimal import Decimal +from typing import Any + +from sqlalchemy import and_, func +from sqlalchemy.orm import Session + +from pote.db.models import MarketAlert, Official, Security, Trade +from pote.monitoring.disclosure_correlator import DisclosureCorrelator + +logger = logging.getLogger(__name__) + + +class PatternDetector: + """ + Detect patterns in congressional trading behavior. + Identifies repeat offenders and systematic advantages. + """ + + def __init__(self, session: Session): + """Initialize pattern detector.""" + self.session = session + self.correlator = DisclosureCorrelator(session) + + def rank_officials_by_timing( + self, lookback_days: int = 365, min_trades: int = 3 + ) -> list[dict[str, Any]]: + """ + Rank officials by suspicious timing scores. + + Args: + lookback_days: Days of history to analyze + min_trades: Minimum trades to include official + + Returns: + List of officials ranked by avg timing score + """ + since_date = date.today() - timedelta(days=lookback_days) + + # Get all officials with recent trades + officials_with_trades = ( + self.session.query( + Official.id, + Official.name, + Official.chamber, + Official.party, + Official.state, + func.count(Trade.id).label("trade_count"), + ) + .join(Trade) + .filter(Trade.transaction_date >= since_date) + .group_by(Official.id) + .having(func.count(Trade.id) >= min_trades) + .all() + ) + + logger.info( + f"Analyzing {len(officials_with_trades)} officials with {min_trades}+ trades" + ) + + rankings = [] + + for official_data in officials_with_trades: + official_id, name, chamber, party, state, trade_count = official_data + + # Get timing pattern + pattern = self.correlator.get_official_timing_pattern( + official_id, lookback_days + ) + + if pattern["trade_count"] == 0: + continue + + # Calculate percentages + alert_rate = ( + pattern["trades_with_prior_alerts"] / pattern["trade_count"] + if pattern["trade_count"] > 0 + else 0 + ) + suspicious_rate = ( + pattern["suspicious_trade_count"] / pattern["trade_count"] + if pattern["trade_count"] > 0 + else 0 + ) + + rankings.append( + { + "official_id": official_id, + "name": name, + "chamber": chamber, + "party": party, + "state": state, + "trade_count": pattern["trade_count"], + "trades_with_alerts": pattern["trades_with_prior_alerts"], + "suspicious_trades": pattern["suspicious_trade_count"], + "highly_suspicious_trades": pattern["highly_suspicious_count"], + "avg_timing_score": pattern["avg_timing_score"], + "alert_rate": round(alert_rate * 100, 1), + "suspicious_rate": round(suspicious_rate * 100, 1), + "pattern": pattern["pattern"], + } + ) + + # Sort by average timing score (descending) + rankings.sort(key=lambda x: x["avg_timing_score"], reverse=True) + + return rankings + + def identify_repeat_offenders( + self, lookback_days: int = 365, min_suspicious_rate: float = 0.5 + ) -> list[dict[str, Any]]: + """ + Identify officials with consistent suspicious timing. + + Args: + lookback_days: Days of history + min_suspicious_rate: Minimum percentage of suspicious trades + + Returns: + List of repeat offenders + """ + rankings = self.rank_officials_by_timing(lookback_days, min_trades=5) + + # Filter for high suspicious rates + offenders = [ + r for r in rankings if r["suspicious_rate"] >= min_suspicious_rate * 100 + ] + + logger.info( + f"Found {len(offenders)} officials with {min_suspicious_rate*100}%+ suspicious trades" + ) + + return offenders + + def analyze_ticker_patterns( + self, lookback_days: int = 365, min_trades: int = 3 + ) -> list[dict[str, Any]]: + """ + Analyze which tickers show most suspicious trading patterns. + + Args: + lookback_days: Days of history + min_trades: Minimum trades to include ticker + + Returns: + List of tickers ranked by timing patterns + """ + since_date = date.today() - timedelta(days=lookback_days) + + # Get tickers with enough trades + tickers_with_trades = ( + self.session.query( + Security.ticker, func.count(Trade.id).label("trade_count") + ) + .join(Trade) + .filter(Trade.transaction_date >= since_date) + .group_by(Security.ticker) + .having(func.count(Trade.id) >= min_trades) + .all() + ) + + logger.info(f"Analyzing {len(tickers_with_trades)} tickers") + + ticker_patterns = [] + + for ticker, trade_count in tickers_with_trades: + analysis = self.correlator.get_ticker_timing_analysis( + ticker, lookback_days + ) + + if analysis["trade_count"] == 0: + continue + + suspicious_rate = ( + analysis["suspicious_count"] / analysis["trade_count"] + if analysis["trade_count"] > 0 + else 0 + ) + + ticker_patterns.append( + { + "ticker": ticker, + "trade_count": analysis["trade_count"], + "trades_with_alerts": analysis["trades_with_alerts"], + "suspicious_count": analysis["suspicious_count"], + "avg_timing_score": analysis["avg_timing_score"], + "suspicious_rate": round(suspicious_rate * 100, 1), + } + ) + + # Sort by average timing score + ticker_patterns.sort(key=lambda x: x["avg_timing_score"], reverse=True) + + return ticker_patterns + + def get_sector_timing_analysis( + self, lookback_days: int = 365 + ) -> dict[str, dict[str, Any]]: + """ + Analyze timing patterns by sector. + + Args: + lookback_days: Days of history + + Returns: + Dict mapping sector to timing stats + """ + since_date = date.today() - timedelta(days=lookback_days) + + # Get trades grouped by sector + trades = ( + self.session.query(Trade) + .join(Trade.security) + .filter(Trade.transaction_date >= since_date) + .all() + ) + + logger.info(f"Analyzing {len(trades)} trades by sector") + + sector_stats: dict[str, dict[str, Any]] = {} + + for trade in trades: + if not trade.security or not trade.security.sector: + continue + + sector = trade.security.sector + + if sector not in sector_stats: + sector_stats[sector] = { + "trade_count": 0, + "trades_with_alerts": 0, + "suspicious_count": 0, + "total_timing_score": 0, + } + + # Analyze this trade + analysis = self.correlator.analyze_trade(trade) + + sector_stats[sector]["trade_count"] += 1 + sector_stats[sector]["total_timing_score"] += analysis["timing_score"] + + if analysis["alert_count"] > 0: + sector_stats[sector]["trades_with_alerts"] += 1 + + if analysis["suspicious"]: + sector_stats[sector]["suspicious_count"] += 1 + + # Calculate averages + for sector, stats in sector_stats.items(): + if stats["trade_count"] > 0: + stats["avg_timing_score"] = round( + stats["total_timing_score"] / stats["trade_count"], 2 + ) + stats["alert_rate"] = round( + stats["trades_with_alerts"] / stats["trade_count"] * 100, 1 + ) + stats["suspicious_rate"] = round( + stats["suspicious_count"] / stats["trade_count"] * 100, 1 + ) + + return sector_stats + + def get_party_comparison( + self, lookback_days: int = 365 + ) -> dict[str, dict[str, Any]]: + """ + Compare timing patterns between political parties. + + Args: + lookback_days: Days of history + + Returns: + Dict mapping party to timing stats + """ + rankings = self.rank_officials_by_timing(lookback_days, min_trades=1) + + party_stats: dict[str, dict[str, Any]] = {} + + for ranking in rankings: + party = ranking["party"] + + if party not in party_stats: + party_stats[party] = { + "official_count": 0, + "total_trades": 0, + "total_suspicious": 0, + "total_timing_score": 0, + "officials": [], + } + + party_stats[party]["official_count"] += 1 + party_stats[party]["total_trades"] += ranking["trade_count"] + party_stats[party]["total_suspicious"] += ranking["suspicious_trades"] + party_stats[party]["total_timing_score"] += ( + ranking["avg_timing_score"] * ranking["trade_count"] + ) + party_stats[party]["officials"].append(ranking) + + # Calculate averages + for party, stats in party_stats.items(): + if stats["total_trades"] > 0: + stats["avg_timing_score"] = round( + stats["total_timing_score"] / stats["total_trades"], 2 + ) + stats["suspicious_rate"] = round( + stats["total_suspicious"] / stats["total_trades"] * 100, 1 + ) + + return party_stats + + def generate_pattern_report(self, lookback_days: int = 365) -> dict[str, Any]: + """ + Generate comprehensive pattern analysis report. + + Args: + lookback_days: Days of history + + Returns: + Complete pattern analysis + """ + logger.info(f"Generating comprehensive pattern report for last {lookback_days} days") + + # Get all analyses + official_rankings = self.rank_officials_by_timing(lookback_days, min_trades=3) + repeat_offenders = self.identify_repeat_offenders(lookback_days) + ticker_patterns = self.analyze_ticker_patterns(lookback_days, min_trades=3) + sector_analysis = self.get_sector_timing_analysis(lookback_days) + party_comparison = self.get_party_comparison(lookback_days) + + # Calculate summary statistics + total_officials = len(official_rankings) + total_offenders = len(repeat_offenders) + + avg_timing_score = ( + sum(r["avg_timing_score"] for r in official_rankings) / total_officials + if total_officials > 0 + else 0 + ) + + return { + "period_days": lookback_days, + "summary": { + "total_officials_analyzed": total_officials, + "repeat_offenders": total_offenders, + "avg_timing_score": round(avg_timing_score, 2), + }, + "top_suspicious_officials": official_rankings[:10], + "repeat_offenders": repeat_offenders, + "suspicious_tickers": ticker_patterns[:10], + "sector_analysis": sector_analysis, + "party_comparison": party_comparison, + } + diff --git a/tests/test_pattern_detector.py b/tests/test_pattern_detector.py new file mode 100644 index 0000000..ff0051a --- /dev/null +++ b/tests/test_pattern_detector.py @@ -0,0 +1,325 @@ +"""Tests for pattern detection module.""" + +import pytest +from datetime import date, datetime, timedelta, timezone +from decimal import Decimal + +from pote.monitoring.pattern_detector import PatternDetector +from pote.db.models import Official, Security, Trade, MarketAlert + + +@pytest.fixture +def multiple_officials_with_patterns(test_db_session): + """Create multiple officials with different timing patterns.""" + session = test_db_session + + # Create officials + pelosi = Official(name="Nancy Pelosi", chamber="House", party="Democrat", state="CA") + tuberville = Official(name="Tommy Tuberville", chamber="Senate", party="Republican", state="AL") + clean_trader = Official(name="Clean Trader", chamber="House", party="Independent", state="TX") + + session.add_all([pelosi, tuberville, clean_trader]) + session.flush() + + # Create securities + nvda = Security(ticker="NVDA", name="NVIDIA", sector="Technology") + msft = Security(ticker="MSFT", name="Microsoft", sector="Technology") + xom = Security(ticker="XOM", name="Exxon", sector="Energy") + + session.add_all([nvda, msft, xom]) + session.flush() + + # Pelosi - Suspicious pattern (trades with alerts) + for i in range(5): + trade_date = date(2024, 1, 15) + timedelta(days=i*30) + + # Create trade + trade = Trade( + official_id=pelosi.id, + security_id=nvda.id, + source="test", + transaction_date=trade_date, + side="buy", + value_min=Decimal("15001"), + value_max=Decimal("50000"), + ) + session.add(trade) + session.flush() + + # Create alerts BEFORE trade (suspicious) + for j in range(2): + alert = MarketAlert( + ticker="NVDA", + alert_type="unusual_volume", + timestamp=datetime.combine( + trade_date - timedelta(days=3+j), + datetime.min.time() + ).replace(tzinfo=timezone.utc), + severity=7 + j, + ) + session.add(alert) + + # Tuberville - Mixed pattern + for i in range(4): + trade_date = date(2024, 2, 1) + timedelta(days=i*30) + + trade = Trade( + official_id=tuberville.id, + security_id=msft.id, + source="test", + transaction_date=trade_date, + side="buy", + value_min=Decimal("10000"), + value_max=Decimal("50000"), + ) + session.add(trade) + session.flush() + + # Only first 2 trades have alerts + if i < 2: + alert = MarketAlert( + ticker="MSFT", + alert_type="price_spike", + timestamp=datetime.combine( + trade_date - timedelta(days=5), + datetime.min.time() + ).replace(tzinfo=timezone.utc), + severity=6, + ) + session.add(alert) + + # Clean trader - No suspicious activity + for i in range(3): + trade_date = date(2024, 3, 1) + timedelta(days=i*30) + + trade = Trade( + official_id=clean_trader.id, + security_id=xom.id, + source="test", + transaction_date=trade_date, + side="buy", + value_min=Decimal("10000"), + value_max=Decimal("50000"), + ) + session.add(trade) + + session.commit() + + return { + "officials": [pelosi, tuberville, clean_trader], + "securities": [nvda, msft, xom], + } + + +def test_rank_officials_by_timing(test_db_session, multiple_officials_with_patterns): + """Test ranking officials by timing scores.""" + session = test_db_session + detector = PatternDetector(session) + + rankings = detector.rank_officials_by_timing(lookback_days=3650, min_trades=3) + + assert len(rankings) >= 2 # At least 2 officials with 3+ trades + + # Rankings should be sorted by avg_timing_score (descending) + for i in range(len(rankings) - 1): + assert rankings[i]["avg_timing_score"] >= rankings[i + 1]["avg_timing_score"] + + # Check required fields + for ranking in rankings: + assert "name" in ranking + assert "party" in ranking + assert "chamber" in ranking + assert "trade_count" in ranking + assert "avg_timing_score" in ranking + assert "suspicious_rate" in ranking + + +def test_identify_repeat_offenders(test_db_session, multiple_officials_with_patterns): + """Test identifying repeat offenders.""" + session = test_db_session + detector = PatternDetector(session) + + # Set low threshold to catch Pelosi (who has 100% suspicious rate) + offenders = detector.identify_repeat_offenders( + lookback_days=3650, + min_suspicious_rate=0.7 # 70%+ + ) + + # Should find at least Pelosi (all trades with alerts) + assert isinstance(offenders, list) + + # All offenders should have high suspicious rates + for offender in offenders: + assert offender["suspicious_rate"] >= 70 + + +def test_analyze_ticker_patterns(test_db_session, multiple_officials_with_patterns): + """Test ticker pattern analysis.""" + session = test_db_session + detector = PatternDetector(session) + + ticker_patterns = detector.analyze_ticker_patterns( + lookback_days=3650, + min_trades=3 + ) + + assert isinstance(ticker_patterns, list) + assert len(ticker_patterns) >= 1 # At least NVDA should qualify + + # Check sorting + for i in range(len(ticker_patterns) - 1): + assert ticker_patterns[i]["avg_timing_score"] >= ticker_patterns[i + 1]["avg_timing_score"] + + # Check fields + for pattern in ticker_patterns: + assert "ticker" in pattern + assert "trade_count" in pattern + assert "avg_timing_score" in pattern + assert "suspicious_rate" in pattern + + +def test_get_sector_timing_analysis(test_db_session, multiple_officials_with_patterns): + """Test sector timing analysis.""" + session = test_db_session + detector = PatternDetector(session) + + sector_stats = detector.get_sector_timing_analysis(lookback_days=3650) + + assert isinstance(sector_stats, dict) + assert len(sector_stats) >= 2 # Technology and Energy + + # Check Technology sector (should have alerts) + if "Technology" in sector_stats: + tech = sector_stats["Technology"] + assert tech["trade_count"] >= 9 # 5 NVDA + 4 MSFT + assert "avg_timing_score" in tech + assert "alert_rate" in tech + assert "suspicious_rate" in tech + + +def test_get_party_comparison(test_db_session, multiple_officials_with_patterns): + """Test party comparison analysis.""" + session = test_db_session + detector = PatternDetector(session) + + party_stats = detector.get_party_comparison(lookback_days=3650) + + assert isinstance(party_stats, dict) + assert len(party_stats) >= 2 # Democrat, Republican, Independent + + # Check that we have data for each party + for party, stats in party_stats.items(): + assert "official_count" in stats + assert "total_trades" in stats + assert "avg_timing_score" in stats + assert "suspicious_rate" in stats + + +def test_generate_pattern_report(test_db_session, multiple_officials_with_patterns): + """Test comprehensive pattern report generation.""" + session = test_db_session + detector = PatternDetector(session) + + report = detector.generate_pattern_report(lookback_days=3650) + + # Check report structure + assert "period_days" in report + assert "summary" in report + assert "top_suspicious_officials" in report + assert "repeat_offenders" in report + assert "suspicious_tickers" in report + assert "sector_analysis" in report + assert "party_comparison" in report + + # Check summary + summary = report["summary"] + assert summary["total_officials_analyzed"] >= 2 + assert "avg_timing_score" in summary + + # Check that lists are populated + assert len(report["top_suspicious_officials"]) >= 2 + assert isinstance(report["suspicious_tickers"], list) + + +def test_rank_officials_min_trades_filter(test_db_session, multiple_officials_with_patterns): + """Test that min_trades filter works correctly.""" + session = test_db_session + detector = PatternDetector(session) + + # With min_trades=5, should only get Pelosi + rankings_high = detector.rank_officials_by_timing(lookback_days=3650, min_trades=5) + + # With min_trades=3, should get at least 2 officials + rankings_low = detector.rank_officials_by_timing(lookback_days=3650, min_trades=3) + + assert len(rankings_low) >= len(rankings_high) + + # All officials should meet min_trades requirement + for ranking in rankings_high: + assert ranking["trade_count"] >= 5 + + +def test_empty_data_handling(test_db_session): + """Test handling of empty dataset.""" + session = test_db_session + detector = PatternDetector(session) + + # With no data, should return empty results + rankings = detector.rank_officials_by_timing(lookback_days=30, min_trades=1) + assert rankings == [] + + offenders = detector.identify_repeat_offenders(lookback_days=30) + assert offenders == [] + + tickers = detector.analyze_ticker_patterns(lookback_days=30) + assert tickers == [] + + sectors = detector.get_sector_timing_analysis(lookback_days=30) + assert sectors == {} + + +def test_ranking_score_accuracy(test_db_session, multiple_officials_with_patterns): + """Test that rankings accurately reflect timing patterns.""" + session = test_db_session + detector = PatternDetector(session) + + rankings = detector.rank_officials_by_timing(lookback_days=3650, min_trades=3) + + # Find Pelosi and Clean Trader + pelosi_rank = next((r for r in rankings if "Pelosi" in r["name"]), None) + clean_rank = next((r for r in rankings if "Clean" in r["name"]), None) + + if pelosi_rank and clean_rank: + # Pelosi (with alerts) should have higher score than clean trader (no alerts) + assert pelosi_rank["avg_timing_score"] > clean_rank["avg_timing_score"] + assert pelosi_rank["trades_with_alerts"] > clean_rank["trades_with_alerts"] + + +def test_sector_stats_accuracy(test_db_session, multiple_officials_with_patterns): + """Test sector statistics are calculated correctly.""" + session = test_db_session + detector = PatternDetector(session) + + sector_stats = detector.get_sector_timing_analysis(lookback_days=3650) + + # Energy should have clean pattern (no alerts) + if "Energy" in sector_stats: + energy = sector_stats["Energy"] + assert energy["suspicious_count"] == 0 + assert energy["alert_rate"] == 0.0 + + +def test_party_stats_completeness(test_db_session, multiple_officials_with_patterns): + """Test party statistics completeness.""" + session = test_db_session + detector = PatternDetector(session) + + party_stats = detector.get_party_comparison(lookback_days=3650) + + # Check Democrats (Pelosi) + if "Democrat" in party_stats: + dem = party_stats["Democrat"] + assert dem["official_count"] >= 1 + assert dem["total_trades"] >= 5 # Pelosi has 5 trades + assert dem["total_suspicious"] > 0 # Pelosi has suspicious trades +