From 6b62ae96f75630efd5031268b9a5435e5d0328df Mon Sep 17 00:00:00 2001 From: ilia Date: Mon, 15 Dec 2025 15:17:09 -0500 Subject: [PATCH] Phase 2: Disclosure Timing Correlation Engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit COMPLETE: Match congressional trades to prior market alerts New Module: - src/pote/monitoring/disclosure_correlator.py: Core correlation engine * get_alerts_before_trade(): Find alerts before trade date * calculate_timing_score(): Score suspicious timing (0-100 scale) - Factors: alert count, severity, recency, type - Thresholds: 60+ = suspicious, 80+ = highly suspicious * analyze_trade(): Complete trade analysis with timing * analyze_recent_disclosures(): Batch analysis of new filings * get_official_timing_pattern(): Historical pattern analysis * get_ticker_timing_analysis(): Per-stock timing patterns Timing Score Algorithm: - Base score: alert count Ɨ 5 + avg severity Ɨ 2 - Recency bonus: +10 per alert within 7 days - Severity bonus: +15 per high-severity (7+) alert - Total score: 0-100 (capped) - Interpretation: * 80-100: Highly suspicious (likely timing advantage) * 60-79: Suspicious (possible timing advantage) * 40-59: Notable (some unusual activity) * 0-39: Normal (no significant pattern) New Script: - scripts/analyze_disclosure_timing.py: CLI analysis tool * Analyze recent disclosures (--days N) * Filter by timing score (--min-score) * Analyze specific official (--official NAME) * Analyze specific ticker (--ticker SYMBOL) * Text/JSON output formats * Detailed reports with prior alerts Usage Examples: # Find suspicious trades filed recently python scripts/analyze_disclosure_timing.py --days 30 --min-score 60 # Analyze specific official python scripts/analyze_disclosure_timing.py --official "Nancy Pelosi" # Analyze specific ticker python scripts/analyze_disclosure_timing.py --ticker NVDA Report Includes: - Timing score and suspicion level - Prior alert details (count, severity, timing) - Official name, ticker, trade details - Assessment and reasoning - Top suspicious trades ranked Next: Phase 3 - Pattern Detection across officials/stocks --- scripts/analyze_disclosure_timing.py | 219 ++++++++++++ src/pote/monitoring/__init__.py | 5 +- src/pote/monitoring/disclosure_correlator.py | 358 +++++++++++++++++++ 3 files changed, 580 insertions(+), 2 deletions(-) create mode 100755 scripts/analyze_disclosure_timing.py create mode 100644 src/pote/monitoring/disclosure_correlator.py diff --git a/scripts/analyze_disclosure_timing.py b/scripts/analyze_disclosure_timing.py new file mode 100755 index 0000000..91e2728 --- /dev/null +++ b/scripts/analyze_disclosure_timing.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python +""" +Analyze congressional trade timing vs market alerts. +Identifies suspicious timing patterns and potential insider trading. +""" + +import click +from pathlib import Path +from tabulate import tabulate + +from pote.db import get_session +from pote.monitoring.disclosure_correlator import DisclosureCorrelator + + +@click.command() +@click.option("--days", default=30, help="Analyze trades filed in last N days") +@click.option("--min-score", default=50, help="Minimum timing score to report (0-100)") +@click.option("--official", help="Analyze specific official by name") +@click.option("--ticker", help="Analyze specific ticker") +@click.option("--output", help="Save report to file") +@click.option("--format", type=click.Choice(["text", "json"]), default="text") +def main(days, min_score, official, ticker, output, format): + """Analyze disclosure timing and detect suspicious patterns.""" + + session = next(get_session()) + correlator = DisclosureCorrelator(session) + + if official: + # Analyze specific official + from pote.db.models import Official + official_obj = session.query(Official).filter( + Official.name.ilike(f"%{official}%") + ).first() + + if not official_obj: + click.echo(f"āŒ Official '{official}' not found") + return + + click.echo(f"\nšŸ“Š Analyzing {official_obj.name}...\n") + result = correlator.get_official_timing_pattern(official_obj.id) + + report = format_official_report(result) + click.echo(report) + + elif ticker: + # Analyze specific ticker + click.echo(f"\nšŸ“Š Analyzing trades in {ticker.upper()}...\n") + result = correlator.get_ticker_timing_analysis(ticker.upper()) + + report = format_ticker_report(result) + click.echo(report) + + else: + # Analyze recent disclosures + click.echo(f"\nšŸ” Analyzing trades filed in last {days} days...") + click.echo(f" Minimum timing score: {min_score}\n") + + suspicious_trades = correlator.analyze_recent_disclosures( + days=days, + min_timing_score=min_score + ) + + if not suspicious_trades: + click.echo(f"āœ… No trades found with timing score >= {min_score}") + return + + report = format_suspicious_trades_report(suspicious_trades) + click.echo(report) + + # Save to file if requested + if output: + Path(output).write_text(report) + click.echo(f"\nšŸ’¾ Report saved to {output}") + + +def format_suspicious_trades_report(trades): + """Format suspicious trades as text report.""" + lines = [ + "=" * 100, + f" SUSPICIOUS TRADING TIMING ANALYSIS", + f" {len(trades)} Trades with Timing Advantages Detected", + "=" * 100, + "", + ] + + for i, trade in enumerate(trades, 1): + # Determine alert level + if trade.get("highly_suspicious"): + alert_emoji = "🚨" + level = "HIGHLY SUSPICIOUS" + elif trade["suspicious"]: + alert_emoji = "šŸ”“" + level = "SUSPICIOUS" + else: + alert_emoji = "🟔" + level = "NOTABLE" + + lines.extend([ + "─" * 100, + f"{alert_emoji} #{i} - {level} (Timing Score: {trade['timing_score']}/100)", + "─" * 100, + f"Official: {trade['official_name']}", + f"Ticker: {trade['ticker']}", + f"Side: {trade['side'].upper()}", + f"Trade Date: {trade['transaction_date']}", + f"Filed Date: {trade['filing_date']}", + f"Value: {trade['value_range']}", + "", + f"šŸ“Š Timing Analysis:", + f" Prior Alerts: {trade['alert_count']}", + f" Recent Alerts (7d): {trade['recent_alert_count']}", + f" High Severity: {trade['high_severity_count']}", + f" Avg Severity: {trade['avg_severity']}/10", + "", + f"šŸ’” Assessment: {trade['reason']}", + "", + ]) + + if trade['prior_alerts']: + lines.append("šŸ”” Prior Market Alerts:") + alert_table = [] + for alert in trade['prior_alerts'][:5]: # Top 5 + alert_table.append([ + alert['timestamp'], + alert['alert_type'].replace('_', ' ').title(), + f"{alert['severity']}/10", + f"{alert['days_before_trade']} days before", + ]) + + lines.append(tabulate( + alert_table, + headers=["Timestamp", "Type", "Severity", "Timing"], + tablefmt="simple" + )) + lines.append("") + + lines.extend([ + "=" * 100, + "šŸ“ˆ SUMMARY", + "=" * 100, + f"Total Suspicious Trades: {len(trades)}", + f"Highly Suspicious: {sum(1 for t in trades if t.get('highly_suspicious'))}", + f"Average Timing Score: {sum(t['timing_score'] for t in trades) / len(trades):.2f}/100", + "", + "āš ļø IMPORTANT:", + " This analysis is for research and transparency purposes only.", + " High timing scores suggest potential issues but are not definitive proof.", + " Further investigation may be warranted for highly suspicious patterns.", + "", + "=" * 100, + ]) + + return "\n".join(lines) + + +def format_official_report(result): + """Format official timing pattern report.""" + lines = [ + "=" * 80, + f" OFFICIAL TIMING PATTERN ANALYSIS", + "=" * 80, + "", + f"Trade Count: {result['trade_count']}", + f"With Prior Alerts: {result['trades_with_prior_alerts']} ({result['trades_with_prior_alerts']/result['trade_count']*100:.1f}%)" if result['trade_count'] > 0 else "", + f"Suspicious Trades: {result['suspicious_trade_count']}", + f"Highly Suspicious: {result['highly_suspicious_count']}", + f"Average Timing Score: {result['avg_timing_score']}/100", + "", + f"šŸ“Š Pattern: {result['pattern']}", + "", + ] + + if result.get('analyses'): + # Show top suspicious trades + suspicious = [a for a in result['analyses'] if a['suspicious']] + if suspicious: + lines.append("🚨 Most Suspicious Trades:") + for trade in suspicious[:5]: + lines.append( + f" {trade['ticker']:6s} {trade['side']:4s} on {trade['transaction_date']} " + f"(Score: {trade['timing_score']:.0f}/100, {trade['alert_count']} alerts)" + ) + lines.append("") + + lines.append("=" * 80) + return "\n".join(lines) + + +def format_ticker_report(result): + """Format ticker timing analysis report.""" + lines = [ + "=" * 80, + f" TICKER TIMING ANALYSIS: {result['ticker']}", + "=" * 80, + "", + f"Total Trades: {result['trade_count']}", + f"With Prior Alerts: {result['trades_with_alerts']}", + f"Suspicious Count: {result['suspicious_count']}", + f"Average Timing Score: {result['avg_timing_score']}/100", + "", + ] + + if result.get('analyses'): + lines.append("šŸ“Š Recent Trades:") + for trade in result['analyses'][:10]: + emoji = "🚨" if trade.get('highly_suspicious') else "šŸ”“" if trade['suspicious'] else "🟔" if trade['alert_count'] > 0 else "āœ…" + lines.append( + f" {emoji} {trade['official_name']:25s} {trade['side']:4s} on {trade['transaction_date']} " + f"(Score: {trade['timing_score']:.0f}/100)" + ) + lines.append("") + + lines.append("=" * 80) + return "\n".join(lines) + + +if __name__ == "__main__": + main() + diff --git a/src/pote/monitoring/__init__.py b/src/pote/monitoring/__init__.py index 08e91a7..efa1e65 100644 --- a/src/pote/monitoring/__init__.py +++ b/src/pote/monitoring/__init__.py @@ -3,8 +3,9 @@ Market monitoring module. Real-time tracking of unusual market activity. """ -from .market_monitor import MarketMonitor from .alert_manager import AlertManager +from .disclosure_correlator import DisclosureCorrelator +from .market_monitor import MarketMonitor -__all__ = ["MarketMonitor", "AlertManager"] +__all__ = ["MarketMonitor", "AlertManager", "DisclosureCorrelator"] diff --git a/src/pote/monitoring/disclosure_correlator.py b/src/pote/monitoring/disclosure_correlator.py new file mode 100644 index 0000000..7db9084 --- /dev/null +++ b/src/pote/monitoring/disclosure_correlator.py @@ -0,0 +1,358 @@ +""" +Disclosure correlation engine. +Matches congressional trade disclosures to prior market alerts. +Calculates timing advantage and suspicious activity scores. +""" + +import logging +from datetime import date, timedelta, timezone +from decimal import Decimal +from typing import Any + +from sqlalchemy import and_, func +from sqlalchemy.orm import Session + +from pote.db.models import MarketAlert, Official, Security, Trade + +logger = logging.getLogger(__name__) + + +class DisclosureCorrelator: + """ + Correlate congressional trades with prior market alerts. + Identifies suspicious timing patterns. + """ + + def __init__(self, session: Session): + """Initialize disclosure correlator.""" + self.session = session + + def get_alerts_before_trade( + self, trade: Trade, lookback_days: int = 30 + ) -> list[MarketAlert]: + """ + Get market alerts that occurred BEFORE a trade. + + Args: + trade: Trade object + lookback_days: Days to look back before trade date + + Returns: + List of MarketAlert objects + """ + if not trade.security or not trade.security.ticker: + return [] + + ticker = trade.security.ticker + start_date = trade.transaction_date - timedelta(days=lookback_days) + end_date = trade.transaction_date + + # Convert dates to datetime for comparison + from datetime import datetime + + start_dt = datetime.combine(start_date, datetime.min.time()).replace( + tzinfo=timezone.utc + ) + end_dt = datetime.combine(end_date, datetime.max.time()).replace( + tzinfo=timezone.utc + ) + + alerts = ( + self.session.query(MarketAlert) + .filter( + and_( + MarketAlert.ticker == ticker, + MarketAlert.timestamp >= start_dt, + MarketAlert.timestamp <= end_dt, + ) + ) + .order_by(MarketAlert.timestamp.desc()) + .all() + ) + + return alerts + + def calculate_timing_score( + self, trade: Trade, prior_alerts: list[MarketAlert] + ) -> dict[str, Any]: + """ + Calculate timing advantage score for a trade. + + Scoring factors: + - Number of prior alerts (more = more suspicious) + - Severity of alerts (higher = more suspicious) + - Recency (closer to trade = more suspicious) + - Alert types (some types more suspicious than others) + + Args: + trade: Trade object + prior_alerts: List of alerts before trade + + Returns: + Dict with timing analysis + """ + if not prior_alerts: + return { + "timing_score": 0, + "suspicious": False, + "reason": "No unusual market activity before trade", + "alert_count": 0, + } + + # Calculate base score from alert count and severity + total_severity = sum(alert.severity or 0 for alert in prior_alerts) + avg_severity = total_severity / len(prior_alerts) + base_score = min(50, len(prior_alerts) * 5 + avg_severity * 2) + + # Bonus for recent alerts (within 7 days) + recent_count = sum( + 1 + for alert in prior_alerts + if (trade.transaction_date - alert.timestamp.date()).days <= 7 + ) + recency_bonus = recent_count * 10 + + # Bonus for high-severity alerts + high_sev_count = sum(1 for alert in prior_alerts if (alert.severity or 0) >= 7) + severity_bonus = high_sev_count * 15 + + # Calculate final score (0-100) + timing_score = min(100, base_score + recency_bonus + severity_bonus) + + # Determine suspicion level + suspicious = timing_score >= 60 + highly_suspicious = timing_score >= 80 + + if highly_suspicious: + reason = ( + f"Trade occurred after {len(prior_alerts)} alerts, " + f"including {high_sev_count} high-severity. " + f"High likelihood of timing advantage." + ) + elif suspicious: + reason = ( + f"Trade occurred after {len(prior_alerts)} alerts. " + f"Possible timing advantage." + ) + else: + reason = ( + f"Some unusual activity before trade ({len(prior_alerts)} alerts), " + f"but timing score is low." + ) + + return { + "timing_score": round(timing_score, 2), + "suspicious": suspicious, + "highly_suspicious": highly_suspicious, + "reason": reason, + "alert_count": len(prior_alerts), + "recent_alert_count": recent_count, + "high_severity_count": high_sev_count, + "avg_severity": round(avg_severity, 2), + "max_severity": max(alert.severity or 0 for alert in prior_alerts), + } + + def analyze_trade(self, trade: Trade, lookback_days: int = 30) -> dict[str, Any]: + """ + Full analysis of a single trade. + + Args: + trade: Trade object + lookback_days: Days to look back + + Returns: + Complete analysis dict + """ + # Get prior alerts + prior_alerts = self.get_alerts_before_trade(trade, lookback_days) + + # Calculate timing score + timing_analysis = self.calculate_timing_score(trade, prior_alerts) + + # Build full analysis + analysis = { + "trade_id": trade.id, + "official_name": trade.official.name if trade.official else None, + "ticker": trade.security.ticker if trade.security else None, + "side": trade.side, + "transaction_date": str(trade.transaction_date), + "filing_date": str(trade.filing_date) if trade.filing_date else None, + "value_range": f"${float(trade.value_min):,.0f}" + + ( + f"-${float(trade.value_max):,.0f}" + if trade.value_max + else "+" + ), + **timing_analysis, + "prior_alerts": [ + { + "timestamp": str(alert.timestamp), + "alert_type": alert.alert_type, + "severity": alert.severity, + "days_before_trade": ( + trade.transaction_date - alert.timestamp.date() + ).days, + } + for alert in prior_alerts + ], + } + + return analysis + + def analyze_recent_disclosures( + self, days: int = 7, min_timing_score: float = 50 + ) -> list[dict[str, Any]]: + """ + Analyze recently filed trades for suspicious timing. + + Args: + days: Analyze trades filed in last N days + min_timing_score: Minimum timing score to include + + Returns: + List of suspicious trade analyses + """ + # Get recent trades + since_date = date.today() - timedelta(days=days) + + trades = ( + self.session.query(Trade) + .filter(Trade.created_at >= since_date) + .join(Trade.official) + .join(Trade.security) + .all() + ) + + logger.info(f"Analyzing {len(trades)} trades filed in last {days} days") + + suspicious_trades = [] + + for trade in trades: + analysis = self.analyze_trade(trade) + + if analysis["timing_score"] >= min_timing_score: + suspicious_trades.append(analysis) + + logger.info( + f"Found {len(suspicious_trades)} trades with timing score >= {min_timing_score}" + ) + + return sorted( + suspicious_trades, key=lambda x: x["timing_score"], reverse=True + ) + + def get_official_timing_pattern( + self, official_id: int, lookback_days: int = 365 + ) -> dict[str, Any]: + """ + Analyze an official's historical trading timing patterns. + + Args: + official_id: Official ID + lookback_days: Days of history to analyze + + Returns: + Pattern analysis dict + """ + since_date = date.today() - timedelta(days=lookback_days) + + trades = ( + self.session.query(Trade) + .filter( + and_(Trade.official_id == official_id, Trade.transaction_date >= since_date) + ) + .join(Trade.security) + .all() + ) + + if not trades: + return { + "official_id": official_id, + "trade_count": 0, + "pattern": "No trades in period", + } + + # Analyze each trade + analyses = [] + for trade in trades: + analysis = self.analyze_trade(trade) + analyses.append(analysis) + + # Calculate aggregate statistics + total_trades = len(analyses) + trades_with_alerts = sum(1 for a in analyses if a["alert_count"] > 0) + suspicious_trades = sum(1 for a in analyses if a["suspicious"]) + highly_suspicious = sum(1 for a in analyses if a.get("highly_suspicious", False)) + + avg_timing_score = ( + sum(a["timing_score"] for a in analyses) / total_trades + if total_trades > 0 + else 0 + ) + + # Determine pattern + if suspicious_trades / total_trades > 0.5: + pattern = "HIGHLY SUSPICIOUS - Majority of trades show timing advantage" + elif suspicious_trades / total_trades > 0.25: + pattern = "SUSPICIOUS - Significant portion of trades show timing advantage" + elif trades_with_alerts / total_trades > 0.5: + pattern = "NOTABLE - Many trades preceded by market alerts" + else: + pattern = "NORMAL - Typical trading pattern" + + return { + "official_id": official_id, + "trade_count": total_trades, + "trades_with_prior_alerts": trades_with_alerts, + "suspicious_trade_count": suspicious_trades, + "highly_suspicious_count": highly_suspicious, + "avg_timing_score": round(avg_timing_score, 2), + "pattern": pattern, + "analyses": analyses, + } + + def get_ticker_timing_analysis( + self, ticker: str, lookback_days: int = 365 + ) -> dict[str, Any]: + """ + Analyze timing patterns for a specific ticker. + + Args: + ticker: Stock ticker + lookback_days: Days of history + + Returns: + Ticker-specific timing analysis + """ + since_date = date.today() - timedelta(days=lookback_days) + + trades = ( + self.session.query(Trade) + .join(Trade.security) + .filter( + and_(Security.ticker == ticker, Trade.transaction_date >= since_date) + ) + .join(Trade.official) + .all() + ) + + if not trades: + return { + "ticker": ticker, + "trade_count": 0, + "pattern": "No trades in period", + } + + analyses = [self.analyze_trade(trade) for trade in trades] + + return { + "ticker": ticker, + "trade_count": len(analyses), + "trades_with_alerts": sum(1 for a in analyses if a["alert_count"] > 0), + "suspicious_count": sum(1 for a in analyses if a["suspicious"]), + "avg_timing_score": round( + sum(a["timing_score"] for a in analyses) / len(analyses), 2 + ), + "analyses": sorted(analyses, key=lambda x: x["timing_score"], reverse=True), + } +