Phase 3: Pattern Detection & Comparative Analysis - COMPLETE

COMPLETE: Cross-official pattern detection and ranking system

New Module:
- src/pote/monitoring/pattern_detector.py: Pattern analysis engine
  * rank_officials_by_timing(): Rank all officials by suspicion
  * identify_repeat_offenders(): Find systematic offenders
  * analyze_ticker_patterns(): Per-stock suspicious patterns
  * get_sector_timing_analysis(): Sector-level analysis
  * get_party_comparison(): Democrat vs Republican comparison
  * generate_pattern_report(): Comprehensive report

Analysis Features:
- Official Rankings:
  * By average timing score
  * Suspicious trade percentage
  * Alert rates
  * Pattern classification

- Repeat Offender Detection:
  * Identifies officials with 50%+ suspicious trades
  * Historical pattern tracking
  * Systematic timing advantage detection

- Comparative Analysis:
  * Cross-party comparison
  * Sector analysis
  * Ticker-specific patterns
  * Statistical aggregations

New Script:
- scripts/generate_pattern_report.py: Comprehensive reports
  * Top 10 most suspicious officials
  * Repeat offenders list
  * Most suspiciously traded stocks
  * Sector breakdowns
  * Party comparison stats
  * Text/JSON formats

New Tests (11 total, all passing):
- test_rank_officials_by_timing
- test_identify_repeat_offenders
- test_analyze_ticker_patterns
- test_get_sector_timing_analysis
- test_get_party_comparison
- test_generate_pattern_report
- test_rank_officials_min_trades_filter
- test_empty_data_handling
- test_ranking_score_accuracy
- test_sector_stats_accuracy
- test_party_stats_completeness

Usage:
  python scripts/generate_pattern_report.py --days 365

Report Includes:
- Top suspicious officials ranked
- Repeat offenders (50%+ suspicious rate)
- Most suspiciously traded tickers
- Sector analysis
- Party comparison
- Interpretation guide

Total Test Suite: 93 tests passing 

ALL 3 PHASES COMPLETE!
This commit is contained in:
ilia 2025-12-15 15:23:40 -05:00
parent a52313145b
commit 2ec4a8e373
4 changed files with 919 additions and 1 deletions

View File

@ -0,0 +1,233 @@
#!/usr/bin/env python
"""
Generate comprehensive pattern analysis report.
Identifies repeat offenders and systematic suspicious behavior.
"""
import click
from pathlib import Path
from tabulate import tabulate
from pote.db import get_session
from pote.monitoring.pattern_detector import PatternDetector
@click.command()
@click.option("--days", default=365, help="Analyze last N days (default: 365)")
@click.option("--output", help="Save report to file")
@click.option("--format", type=click.Choice(["text", "json"]), default="text")
def main(days, output, format):
"""Generate comprehensive pattern analysis report."""
session = next(get_session())
detector = PatternDetector(session)
click.echo(f"\n🔍 Generating pattern analysis for last {days} days...\n")
report_data = detector.generate_pattern_report(lookback_days=days)
if format == "json":
import json
report = json.dumps(report_data, indent=2, default=str)
else:
report = format_pattern_report(report_data)
click.echo(report)
if output:
Path(output).write_text(report)
click.echo(f"\n💾 Report saved to {output}")
def format_pattern_report(data):
"""Format pattern data as text report."""
lines = [
"=" * 100,
" CONGRESSIONAL TRADING PATTERN ANALYSIS",
f" Period: {data['period_days']} days",
"=" * 100,
"",
"📊 SUMMARY",
"" * 100,
f"Officials Analyzed: {data['summary']['total_officials_analyzed']}",
f"Repeat Offenders: {data['summary']['repeat_offenders']}",
f"Average Timing Score: {data['summary']['avg_timing_score']}/100",
"",
]
# Top Suspicious Officials
if data['top_suspicious_officials']:
lines.extend([
"",
"🚨 TOP 10 MOST SUSPICIOUS OFFICIALS (By Timing Score)",
"=" * 100,
"",
])
table_data = []
for i, official in enumerate(data['top_suspicious_officials'][:10], 1):
# Determine emoji based on severity
if official['avg_timing_score'] >= 70:
emoji = "🚨"
elif official['avg_timing_score'] >= 50:
emoji = "🔴"
else:
emoji = "🟡"
table_data.append([
f"{emoji} {i}",
official['name'],
f"{official['party'][:1]}-{official['state']}",
official['chamber'],
official['trade_count'],
f"{official['suspicious_trades']}/{official['trade_count']}",
f"{official['suspicious_rate']}%",
f"{official['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Rank", "Official", "Party-State", "Chamber", "Trades", "Suspicious", "Rate", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Repeat Offenders
if data['repeat_offenders']:
lines.extend([
"",
"🔥 REPEAT OFFENDERS (50%+ Suspicious Trades)",
"=" * 100,
"",
])
for offender in data['repeat_offenders']:
lines.extend([
f"🚨 {offender['name']} ({offender['party'][:1]}-{offender['state']}, {offender['chamber']})",
f" Trades: {offender['trade_count']} | Suspicious: {offender['suspicious_trades']} ({offender['suspicious_rate']}%)",
f" Avg Timing Score: {offender['avg_timing_score']}/100",
f" Pattern: {offender['pattern']}",
"",
])
# Suspicious Tickers
if data['suspicious_tickers']:
lines.extend([
"",
"📈 MOST SUSPICIOUSLY TRADED STOCKS",
"=" * 100,
"",
])
table_data = []
for ticker_data in data['suspicious_tickers'][:10]:
table_data.append([
ticker_data['ticker'],
ticker_data['trade_count'],
f"{ticker_data['trades_with_alerts']}/{ticker_data['trade_count']}",
f"{ticker_data['suspicious_count']}/{ticker_data['trade_count']}",
f"{ticker_data['suspicious_rate']}%",
f"{ticker_data['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Ticker", "Total Trades", "With Alerts", "Suspicious", "Susp. Rate", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Sector Analysis
if data['sector_analysis']:
lines.extend([
"",
"🏭 SECTOR ANALYSIS",
"=" * 100,
"",
])
# Sort sectors by suspicious rate
sectors = sorted(
data['sector_analysis'].items(),
key=lambda x: x[1].get('suspicious_rate', 0),
reverse=True
)
table_data = []
for sector, stats in sectors[:10]:
table_data.append([
sector,
stats['trade_count'],
f"{stats['trades_with_alerts']}/{stats['trade_count']}",
f"{stats['alert_rate']}%",
f"{stats['suspicious_count']}/{stats['trade_count']}",
f"{stats['suspicious_rate']}%",
f"{stats['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Sector", "Trades", "W/ Alerts", "Alert %", "Suspicious", "Susp %", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Party Comparison
if data['party_comparison']:
lines.extend([
"",
"🏛️ PARTY COMPARISON",
"=" * 100,
"",
])
table_data = []
for party, stats in sorted(data['party_comparison'].items()):
table_data.append([
party,
stats['official_count'],
stats['total_trades'],
f"{stats['total_suspicious']}/{stats['total_trades']}",
f"{stats['suspicious_rate']}%",
f"{stats['avg_timing_score']}/100",
])
lines.append(tabulate(
table_data,
headers=["Party", "Officials", "Total Trades", "Suspicious", "Susp. Rate", "Avg Score"],
tablefmt="simple"
))
lines.append("")
# Footer
lines.extend([
"",
"=" * 100,
"📋 INTERPRETATION GUIDE",
"=" * 100,
"",
"Timing Score Ranges:",
" 🚨 80-100: Highly suspicious - Strong evidence of timing advantage",
" 🔴 60-79: Suspicious - Likely timing advantage",
" 🟡 40-59: Notable - Some unusual activity",
" ✅ 0-39: Normal - No significant pattern",
"",
"Suspicious Rate:",
" 50%+ = Repeat offender pattern",
" 25-50% = Concerning frequency",
" <25% = Within normal range",
"",
"⚠️ DISCLAIMER:",
" This analysis is for research and transparency purposes only.",
" High scores indicate statistical anomalies requiring further investigation.",
" This is not legal proof of wrongdoing.",
"",
"=" * 100,
])
return "\n".join(lines)
if __name__ == "__main__":
main()

View File

@ -6,6 +6,7 @@ Real-time tracking of unusual market activity.
from .alert_manager import AlertManager
from .disclosure_correlator import DisclosureCorrelator
from .market_monitor import MarketMonitor
from .pattern_detector import PatternDetector
__all__ = ["MarketMonitor", "AlertManager", "DisclosureCorrelator"]
__all__ = ["MarketMonitor", "AlertManager", "DisclosureCorrelator", "PatternDetector"]

View File

@ -0,0 +1,359 @@
"""
Pattern detection across officials and stocks.
Identifies recurring suspicious behavior and trading patterns.
"""
import logging
from datetime import date, timedelta
from decimal import Decimal
from typing import Any
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from pote.db.models import MarketAlert, Official, Security, Trade
from pote.monitoring.disclosure_correlator import DisclosureCorrelator
logger = logging.getLogger(__name__)
class PatternDetector:
"""
Detect patterns in congressional trading behavior.
Identifies repeat offenders and systematic advantages.
"""
def __init__(self, session: Session):
"""Initialize pattern detector."""
self.session = session
self.correlator = DisclosureCorrelator(session)
def rank_officials_by_timing(
self, lookback_days: int = 365, min_trades: int = 3
) -> list[dict[str, Any]]:
"""
Rank officials by suspicious timing scores.
Args:
lookback_days: Days of history to analyze
min_trades: Minimum trades to include official
Returns:
List of officials ranked by avg timing score
"""
since_date = date.today() - timedelta(days=lookback_days)
# Get all officials with recent trades
officials_with_trades = (
self.session.query(
Official.id,
Official.name,
Official.chamber,
Official.party,
Official.state,
func.count(Trade.id).label("trade_count"),
)
.join(Trade)
.filter(Trade.transaction_date >= since_date)
.group_by(Official.id)
.having(func.count(Trade.id) >= min_trades)
.all()
)
logger.info(
f"Analyzing {len(officials_with_trades)} officials with {min_trades}+ trades"
)
rankings = []
for official_data in officials_with_trades:
official_id, name, chamber, party, state, trade_count = official_data
# Get timing pattern
pattern = self.correlator.get_official_timing_pattern(
official_id, lookback_days
)
if pattern["trade_count"] == 0:
continue
# Calculate percentages
alert_rate = (
pattern["trades_with_prior_alerts"] / pattern["trade_count"]
if pattern["trade_count"] > 0
else 0
)
suspicious_rate = (
pattern["suspicious_trade_count"] / pattern["trade_count"]
if pattern["trade_count"] > 0
else 0
)
rankings.append(
{
"official_id": official_id,
"name": name,
"chamber": chamber,
"party": party,
"state": state,
"trade_count": pattern["trade_count"],
"trades_with_alerts": pattern["trades_with_prior_alerts"],
"suspicious_trades": pattern["suspicious_trade_count"],
"highly_suspicious_trades": pattern["highly_suspicious_count"],
"avg_timing_score": pattern["avg_timing_score"],
"alert_rate": round(alert_rate * 100, 1),
"suspicious_rate": round(suspicious_rate * 100, 1),
"pattern": pattern["pattern"],
}
)
# Sort by average timing score (descending)
rankings.sort(key=lambda x: x["avg_timing_score"], reverse=True)
return rankings
def identify_repeat_offenders(
self, lookback_days: int = 365, min_suspicious_rate: float = 0.5
) -> list[dict[str, Any]]:
"""
Identify officials with consistent suspicious timing.
Args:
lookback_days: Days of history
min_suspicious_rate: Minimum percentage of suspicious trades
Returns:
List of repeat offenders
"""
rankings = self.rank_officials_by_timing(lookback_days, min_trades=5)
# Filter for high suspicious rates
offenders = [
r for r in rankings if r["suspicious_rate"] >= min_suspicious_rate * 100
]
logger.info(
f"Found {len(offenders)} officials with {min_suspicious_rate*100}%+ suspicious trades"
)
return offenders
def analyze_ticker_patterns(
self, lookback_days: int = 365, min_trades: int = 3
) -> list[dict[str, Any]]:
"""
Analyze which tickers show most suspicious trading patterns.
Args:
lookback_days: Days of history
min_trades: Minimum trades to include ticker
Returns:
List of tickers ranked by timing patterns
"""
since_date = date.today() - timedelta(days=lookback_days)
# Get tickers with enough trades
tickers_with_trades = (
self.session.query(
Security.ticker, func.count(Trade.id).label("trade_count")
)
.join(Trade)
.filter(Trade.transaction_date >= since_date)
.group_by(Security.ticker)
.having(func.count(Trade.id) >= min_trades)
.all()
)
logger.info(f"Analyzing {len(tickers_with_trades)} tickers")
ticker_patterns = []
for ticker, trade_count in tickers_with_trades:
analysis = self.correlator.get_ticker_timing_analysis(
ticker, lookback_days
)
if analysis["trade_count"] == 0:
continue
suspicious_rate = (
analysis["suspicious_count"] / analysis["trade_count"]
if analysis["trade_count"] > 0
else 0
)
ticker_patterns.append(
{
"ticker": ticker,
"trade_count": analysis["trade_count"],
"trades_with_alerts": analysis["trades_with_alerts"],
"suspicious_count": analysis["suspicious_count"],
"avg_timing_score": analysis["avg_timing_score"],
"suspicious_rate": round(suspicious_rate * 100, 1),
}
)
# Sort by average timing score
ticker_patterns.sort(key=lambda x: x["avg_timing_score"], reverse=True)
return ticker_patterns
def get_sector_timing_analysis(
self, lookback_days: int = 365
) -> dict[str, dict[str, Any]]:
"""
Analyze timing patterns by sector.
Args:
lookback_days: Days of history
Returns:
Dict mapping sector to timing stats
"""
since_date = date.today() - timedelta(days=lookback_days)
# Get trades grouped by sector
trades = (
self.session.query(Trade)
.join(Trade.security)
.filter(Trade.transaction_date >= since_date)
.all()
)
logger.info(f"Analyzing {len(trades)} trades by sector")
sector_stats: dict[str, dict[str, Any]] = {}
for trade in trades:
if not trade.security or not trade.security.sector:
continue
sector = trade.security.sector
if sector not in sector_stats:
sector_stats[sector] = {
"trade_count": 0,
"trades_with_alerts": 0,
"suspicious_count": 0,
"total_timing_score": 0,
}
# Analyze this trade
analysis = self.correlator.analyze_trade(trade)
sector_stats[sector]["trade_count"] += 1
sector_stats[sector]["total_timing_score"] += analysis["timing_score"]
if analysis["alert_count"] > 0:
sector_stats[sector]["trades_with_alerts"] += 1
if analysis["suspicious"]:
sector_stats[sector]["suspicious_count"] += 1
# Calculate averages
for sector, stats in sector_stats.items():
if stats["trade_count"] > 0:
stats["avg_timing_score"] = round(
stats["total_timing_score"] / stats["trade_count"], 2
)
stats["alert_rate"] = round(
stats["trades_with_alerts"] / stats["trade_count"] * 100, 1
)
stats["suspicious_rate"] = round(
stats["suspicious_count"] / stats["trade_count"] * 100, 1
)
return sector_stats
def get_party_comparison(
self, lookback_days: int = 365
) -> dict[str, dict[str, Any]]:
"""
Compare timing patterns between political parties.
Args:
lookback_days: Days of history
Returns:
Dict mapping party to timing stats
"""
rankings = self.rank_officials_by_timing(lookback_days, min_trades=1)
party_stats: dict[str, dict[str, Any]] = {}
for ranking in rankings:
party = ranking["party"]
if party not in party_stats:
party_stats[party] = {
"official_count": 0,
"total_trades": 0,
"total_suspicious": 0,
"total_timing_score": 0,
"officials": [],
}
party_stats[party]["official_count"] += 1
party_stats[party]["total_trades"] += ranking["trade_count"]
party_stats[party]["total_suspicious"] += ranking["suspicious_trades"]
party_stats[party]["total_timing_score"] += (
ranking["avg_timing_score"] * ranking["trade_count"]
)
party_stats[party]["officials"].append(ranking)
# Calculate averages
for party, stats in party_stats.items():
if stats["total_trades"] > 0:
stats["avg_timing_score"] = round(
stats["total_timing_score"] / stats["total_trades"], 2
)
stats["suspicious_rate"] = round(
stats["total_suspicious"] / stats["total_trades"] * 100, 1
)
return party_stats
def generate_pattern_report(self, lookback_days: int = 365) -> dict[str, Any]:
"""
Generate comprehensive pattern analysis report.
Args:
lookback_days: Days of history
Returns:
Complete pattern analysis
"""
logger.info(f"Generating comprehensive pattern report for last {lookback_days} days")
# Get all analyses
official_rankings = self.rank_officials_by_timing(lookback_days, min_trades=3)
repeat_offenders = self.identify_repeat_offenders(lookback_days)
ticker_patterns = self.analyze_ticker_patterns(lookback_days, min_trades=3)
sector_analysis = self.get_sector_timing_analysis(lookback_days)
party_comparison = self.get_party_comparison(lookback_days)
# Calculate summary statistics
total_officials = len(official_rankings)
total_offenders = len(repeat_offenders)
avg_timing_score = (
sum(r["avg_timing_score"] for r in official_rankings) / total_officials
if total_officials > 0
else 0
)
return {
"period_days": lookback_days,
"summary": {
"total_officials_analyzed": total_officials,
"repeat_offenders": total_offenders,
"avg_timing_score": round(avg_timing_score, 2),
},
"top_suspicious_officials": official_rankings[:10],
"repeat_offenders": repeat_offenders,
"suspicious_tickers": ticker_patterns[:10],
"sector_analysis": sector_analysis,
"party_comparison": party_comparison,
}

View File

@ -0,0 +1,325 @@
"""Tests for pattern detection module."""
import pytest
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal
from pote.monitoring.pattern_detector import PatternDetector
from pote.db.models import Official, Security, Trade, MarketAlert
@pytest.fixture
def multiple_officials_with_patterns(test_db_session):
"""Create multiple officials with different timing patterns."""
session = test_db_session
# Create officials
pelosi = Official(name="Nancy Pelosi", chamber="House", party="Democrat", state="CA")
tuberville = Official(name="Tommy Tuberville", chamber="Senate", party="Republican", state="AL")
clean_trader = Official(name="Clean Trader", chamber="House", party="Independent", state="TX")
session.add_all([pelosi, tuberville, clean_trader])
session.flush()
# Create securities
nvda = Security(ticker="NVDA", name="NVIDIA", sector="Technology")
msft = Security(ticker="MSFT", name="Microsoft", sector="Technology")
xom = Security(ticker="XOM", name="Exxon", sector="Energy")
session.add_all([nvda, msft, xom])
session.flush()
# Pelosi - Suspicious pattern (trades with alerts)
for i in range(5):
trade_date = date(2024, 1, 15) + timedelta(days=i*30)
# Create trade
trade = Trade(
official_id=pelosi.id,
security_id=nvda.id,
source="test",
transaction_date=trade_date,
side="buy",
value_min=Decimal("15001"),
value_max=Decimal("50000"),
)
session.add(trade)
session.flush()
# Create alerts BEFORE trade (suspicious)
for j in range(2):
alert = MarketAlert(
ticker="NVDA",
alert_type="unusual_volume",
timestamp=datetime.combine(
trade_date - timedelta(days=3+j),
datetime.min.time()
).replace(tzinfo=timezone.utc),
severity=7 + j,
)
session.add(alert)
# Tuberville - Mixed pattern
for i in range(4):
trade_date = date(2024, 2, 1) + timedelta(days=i*30)
trade = Trade(
official_id=tuberville.id,
security_id=msft.id,
source="test",
transaction_date=trade_date,
side="buy",
value_min=Decimal("10000"),
value_max=Decimal("50000"),
)
session.add(trade)
session.flush()
# Only first 2 trades have alerts
if i < 2:
alert = MarketAlert(
ticker="MSFT",
alert_type="price_spike",
timestamp=datetime.combine(
trade_date - timedelta(days=5),
datetime.min.time()
).replace(tzinfo=timezone.utc),
severity=6,
)
session.add(alert)
# Clean trader - No suspicious activity
for i in range(3):
trade_date = date(2024, 3, 1) + timedelta(days=i*30)
trade = Trade(
official_id=clean_trader.id,
security_id=xom.id,
source="test",
transaction_date=trade_date,
side="buy",
value_min=Decimal("10000"),
value_max=Decimal("50000"),
)
session.add(trade)
session.commit()
return {
"officials": [pelosi, tuberville, clean_trader],
"securities": [nvda, msft, xom],
}
def test_rank_officials_by_timing(test_db_session, multiple_officials_with_patterns):
"""Test ranking officials by timing scores."""
session = test_db_session
detector = PatternDetector(session)
rankings = detector.rank_officials_by_timing(lookback_days=3650, min_trades=3)
assert len(rankings) >= 2 # At least 2 officials with 3+ trades
# Rankings should be sorted by avg_timing_score (descending)
for i in range(len(rankings) - 1):
assert rankings[i]["avg_timing_score"] >= rankings[i + 1]["avg_timing_score"]
# Check required fields
for ranking in rankings:
assert "name" in ranking
assert "party" in ranking
assert "chamber" in ranking
assert "trade_count" in ranking
assert "avg_timing_score" in ranking
assert "suspicious_rate" in ranking
def test_identify_repeat_offenders(test_db_session, multiple_officials_with_patterns):
"""Test identifying repeat offenders."""
session = test_db_session
detector = PatternDetector(session)
# Set low threshold to catch Pelosi (who has 100% suspicious rate)
offenders = detector.identify_repeat_offenders(
lookback_days=3650,
min_suspicious_rate=0.7 # 70%+
)
# Should find at least Pelosi (all trades with alerts)
assert isinstance(offenders, list)
# All offenders should have high suspicious rates
for offender in offenders:
assert offender["suspicious_rate"] >= 70
def test_analyze_ticker_patterns(test_db_session, multiple_officials_with_patterns):
"""Test ticker pattern analysis."""
session = test_db_session
detector = PatternDetector(session)
ticker_patterns = detector.analyze_ticker_patterns(
lookback_days=3650,
min_trades=3
)
assert isinstance(ticker_patterns, list)
assert len(ticker_patterns) >= 1 # At least NVDA should qualify
# Check sorting
for i in range(len(ticker_patterns) - 1):
assert ticker_patterns[i]["avg_timing_score"] >= ticker_patterns[i + 1]["avg_timing_score"]
# Check fields
for pattern in ticker_patterns:
assert "ticker" in pattern
assert "trade_count" in pattern
assert "avg_timing_score" in pattern
assert "suspicious_rate" in pattern
def test_get_sector_timing_analysis(test_db_session, multiple_officials_with_patterns):
"""Test sector timing analysis."""
session = test_db_session
detector = PatternDetector(session)
sector_stats = detector.get_sector_timing_analysis(lookback_days=3650)
assert isinstance(sector_stats, dict)
assert len(sector_stats) >= 2 # Technology and Energy
# Check Technology sector (should have alerts)
if "Technology" in sector_stats:
tech = sector_stats["Technology"]
assert tech["trade_count"] >= 9 # 5 NVDA + 4 MSFT
assert "avg_timing_score" in tech
assert "alert_rate" in tech
assert "suspicious_rate" in tech
def test_get_party_comparison(test_db_session, multiple_officials_with_patterns):
"""Test party comparison analysis."""
session = test_db_session
detector = PatternDetector(session)
party_stats = detector.get_party_comparison(lookback_days=3650)
assert isinstance(party_stats, dict)
assert len(party_stats) >= 2 # Democrat, Republican, Independent
# Check that we have data for each party
for party, stats in party_stats.items():
assert "official_count" in stats
assert "total_trades" in stats
assert "avg_timing_score" in stats
assert "suspicious_rate" in stats
def test_generate_pattern_report(test_db_session, multiple_officials_with_patterns):
"""Test comprehensive pattern report generation."""
session = test_db_session
detector = PatternDetector(session)
report = detector.generate_pattern_report(lookback_days=3650)
# Check report structure
assert "period_days" in report
assert "summary" in report
assert "top_suspicious_officials" in report
assert "repeat_offenders" in report
assert "suspicious_tickers" in report
assert "sector_analysis" in report
assert "party_comparison" in report
# Check summary
summary = report["summary"]
assert summary["total_officials_analyzed"] >= 2
assert "avg_timing_score" in summary
# Check that lists are populated
assert len(report["top_suspicious_officials"]) >= 2
assert isinstance(report["suspicious_tickers"], list)
def test_rank_officials_min_trades_filter(test_db_session, multiple_officials_with_patterns):
"""Test that min_trades filter works correctly."""
session = test_db_session
detector = PatternDetector(session)
# With min_trades=5, should only get Pelosi
rankings_high = detector.rank_officials_by_timing(lookback_days=3650, min_trades=5)
# With min_trades=3, should get at least 2 officials
rankings_low = detector.rank_officials_by_timing(lookback_days=3650, min_trades=3)
assert len(rankings_low) >= len(rankings_high)
# All officials should meet min_trades requirement
for ranking in rankings_high:
assert ranking["trade_count"] >= 5
def test_empty_data_handling(test_db_session):
"""Test handling of empty dataset."""
session = test_db_session
detector = PatternDetector(session)
# With no data, should return empty results
rankings = detector.rank_officials_by_timing(lookback_days=30, min_trades=1)
assert rankings == []
offenders = detector.identify_repeat_offenders(lookback_days=30)
assert offenders == []
tickers = detector.analyze_ticker_patterns(lookback_days=30)
assert tickers == []
sectors = detector.get_sector_timing_analysis(lookback_days=30)
assert sectors == {}
def test_ranking_score_accuracy(test_db_session, multiple_officials_with_patterns):
"""Test that rankings accurately reflect timing patterns."""
session = test_db_session
detector = PatternDetector(session)
rankings = detector.rank_officials_by_timing(lookback_days=3650, min_trades=3)
# Find Pelosi and Clean Trader
pelosi_rank = next((r for r in rankings if "Pelosi" in r["name"]), None)
clean_rank = next((r for r in rankings if "Clean" in r["name"]), None)
if pelosi_rank and clean_rank:
# Pelosi (with alerts) should have higher score than clean trader (no alerts)
assert pelosi_rank["avg_timing_score"] > clean_rank["avg_timing_score"]
assert pelosi_rank["trades_with_alerts"] > clean_rank["trades_with_alerts"]
def test_sector_stats_accuracy(test_db_session, multiple_officials_with_patterns):
"""Test sector statistics are calculated correctly."""
session = test_db_session
detector = PatternDetector(session)
sector_stats = detector.get_sector_timing_analysis(lookback_days=3650)
# Energy should have clean pattern (no alerts)
if "Energy" in sector_stats:
energy = sector_stats["Energy"]
assert energy["suspicious_count"] == 0
assert energy["alert_rate"] == 0.0
def test_party_stats_completeness(test_db_session, multiple_officials_with_patterns):
"""Test party statistics completeness."""
session = test_db_session
detector = PatternDetector(session)
party_stats = detector.get_party_comparison(lookback_days=3650)
# Check Democrats (Pelosi)
if "Democrat" in party_stats:
dem = party_stats["Democrat"]
assert dem["official_count"] >= 1
assert dem["total_trades"] >= 5 # Pelosi has 5 trades
assert dem["total_suspicious"] > 0 # Pelosi has suspicious trades