Phase 2: Disclosure Timing Correlation Engine

COMPLETE: Match congressional trades to prior market alerts

New Module:
- src/pote/monitoring/disclosure_correlator.py: Core correlation engine
  * get_alerts_before_trade(): Find alerts before trade date
  * calculate_timing_score(): Score suspicious timing (0-100 scale)
    - Factors: alert count, severity, recency, type
    - Thresholds: 60+ = suspicious, 80+ = highly suspicious
  * analyze_trade(): Complete trade analysis with timing
  * analyze_recent_disclosures(): Batch analysis of new filings
  * get_official_timing_pattern(): Historical pattern analysis
  * get_ticker_timing_analysis(): Per-stock timing patterns

Timing Score Algorithm:
- Base score: alert count × 5 + avg severity × 2
- Recency bonus: +10 per alert within 7 days
- Severity bonus: +15 per high-severity (7+) alert
- Total score: 0-100 (capped)
- Interpretation:
  * 80-100: Highly suspicious (likely timing advantage)
  * 60-79: Suspicious (possible timing advantage)
  * 40-59: Notable (some unusual activity)
  * 0-39: Normal (no significant pattern)

New Script:
- scripts/analyze_disclosure_timing.py: CLI analysis tool
  * Analyze recent disclosures (--days N)
  * Filter by timing score (--min-score)
  * Analyze specific official (--official NAME)
  * Analyze specific ticker (--ticker SYMBOL)
  * Text/JSON output formats
  * Detailed reports with prior alerts

Usage Examples:
  # Find suspicious trades filed recently
  python scripts/analyze_disclosure_timing.py --days 30 --min-score 60

  # Analyze specific official
  python scripts/analyze_disclosure_timing.py --official "Nancy Pelosi"

  # Analyze specific ticker
  python scripts/analyze_disclosure_timing.py --ticker NVDA

Report Includes:
- Timing score and suspicion level
- Prior alert details (count, severity, timing)
- Official name, ticker, trade details
- Assessment and reasoning
- Top suspicious trades ranked

Next: Phase 3 - Pattern Detection across officials/stocks
This commit is contained in:
ilia 2025-12-15 15:17:09 -05:00
parent db34f26cdc
commit 6b62ae96f7
3 changed files with 580 additions and 2 deletions

View File

@ -0,0 +1,219 @@
#!/usr/bin/env python
"""
Analyze congressional trade timing vs market alerts.
Identifies suspicious timing patterns and potential insider trading.
"""
import click
from pathlib import Path
from tabulate import tabulate
from pote.db import get_session
from pote.monitoring.disclosure_correlator import DisclosureCorrelator
@click.command()
@click.option("--days", default=30, help="Analyze trades filed in last N days")
@click.option("--min-score", default=50, help="Minimum timing score to report (0-100)")
@click.option("--official", help="Analyze specific official by name")
@click.option("--ticker", help="Analyze specific ticker")
@click.option("--output", help="Save report to file")
@click.option("--format", type=click.Choice(["text", "json"]), default="text")
def main(days, min_score, official, ticker, output, format):
"""Analyze disclosure timing and detect suspicious patterns."""
session = next(get_session())
correlator = DisclosureCorrelator(session)
if official:
# Analyze specific official
from pote.db.models import Official
official_obj = session.query(Official).filter(
Official.name.ilike(f"%{official}%")
).first()
if not official_obj:
click.echo(f"❌ Official '{official}' not found")
return
click.echo(f"\n📊 Analyzing {official_obj.name}...\n")
result = correlator.get_official_timing_pattern(official_obj.id)
report = format_official_report(result)
click.echo(report)
elif ticker:
# Analyze specific ticker
click.echo(f"\n📊 Analyzing trades in {ticker.upper()}...\n")
result = correlator.get_ticker_timing_analysis(ticker.upper())
report = format_ticker_report(result)
click.echo(report)
else:
# Analyze recent disclosures
click.echo(f"\n🔍 Analyzing trades filed in last {days} days...")
click.echo(f" Minimum timing score: {min_score}\n")
suspicious_trades = correlator.analyze_recent_disclosures(
days=days,
min_timing_score=min_score
)
if not suspicious_trades:
click.echo(f"✅ No trades found with timing score >= {min_score}")
return
report = format_suspicious_trades_report(suspicious_trades)
click.echo(report)
# Save to file if requested
if output:
Path(output).write_text(report)
click.echo(f"\n💾 Report saved to {output}")
def format_suspicious_trades_report(trades):
"""Format suspicious trades as text report."""
lines = [
"=" * 100,
f" SUSPICIOUS TRADING TIMING ANALYSIS",
f" {len(trades)} Trades with Timing Advantages Detected",
"=" * 100,
"",
]
for i, trade in enumerate(trades, 1):
# Determine alert level
if trade.get("highly_suspicious"):
alert_emoji = "🚨"
level = "HIGHLY SUSPICIOUS"
elif trade["suspicious"]:
alert_emoji = "🔴"
level = "SUSPICIOUS"
else:
alert_emoji = "🟡"
level = "NOTABLE"
lines.extend([
"" * 100,
f"{alert_emoji} #{i} - {level} (Timing Score: {trade['timing_score']}/100)",
"" * 100,
f"Official: {trade['official_name']}",
f"Ticker: {trade['ticker']}",
f"Side: {trade['side'].upper()}",
f"Trade Date: {trade['transaction_date']}",
f"Filed Date: {trade['filing_date']}",
f"Value: {trade['value_range']}",
"",
f"📊 Timing Analysis:",
f" Prior Alerts: {trade['alert_count']}",
f" Recent Alerts (7d): {trade['recent_alert_count']}",
f" High Severity: {trade['high_severity_count']}",
f" Avg Severity: {trade['avg_severity']}/10",
"",
f"💡 Assessment: {trade['reason']}",
"",
])
if trade['prior_alerts']:
lines.append("🔔 Prior Market Alerts:")
alert_table = []
for alert in trade['prior_alerts'][:5]: # Top 5
alert_table.append([
alert['timestamp'],
alert['alert_type'].replace('_', ' ').title(),
f"{alert['severity']}/10",
f"{alert['days_before_trade']} days before",
])
lines.append(tabulate(
alert_table,
headers=["Timestamp", "Type", "Severity", "Timing"],
tablefmt="simple"
))
lines.append("")
lines.extend([
"=" * 100,
"📈 SUMMARY",
"=" * 100,
f"Total Suspicious Trades: {len(trades)}",
f"Highly Suspicious: {sum(1 for t in trades if t.get('highly_suspicious'))}",
f"Average Timing Score: {sum(t['timing_score'] for t in trades) / len(trades):.2f}/100",
"",
"⚠️ IMPORTANT:",
" This analysis is for research and transparency purposes only.",
" High timing scores suggest potential issues but are not definitive proof.",
" Further investigation may be warranted for highly suspicious patterns.",
"",
"=" * 100,
])
return "\n".join(lines)
def format_official_report(result):
"""Format official timing pattern report."""
lines = [
"=" * 80,
f" OFFICIAL TIMING PATTERN ANALYSIS",
"=" * 80,
"",
f"Trade Count: {result['trade_count']}",
f"With Prior Alerts: {result['trades_with_prior_alerts']} ({result['trades_with_prior_alerts']/result['trade_count']*100:.1f}%)" if result['trade_count'] > 0 else "",
f"Suspicious Trades: {result['suspicious_trade_count']}",
f"Highly Suspicious: {result['highly_suspicious_count']}",
f"Average Timing Score: {result['avg_timing_score']}/100",
"",
f"📊 Pattern: {result['pattern']}",
"",
]
if result.get('analyses'):
# Show top suspicious trades
suspicious = [a for a in result['analyses'] if a['suspicious']]
if suspicious:
lines.append("🚨 Most Suspicious Trades:")
for trade in suspicious[:5]:
lines.append(
f" {trade['ticker']:6s} {trade['side']:4s} on {trade['transaction_date']} "
f"(Score: {trade['timing_score']:.0f}/100, {trade['alert_count']} alerts)"
)
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def format_ticker_report(result):
"""Format ticker timing analysis report."""
lines = [
"=" * 80,
f" TICKER TIMING ANALYSIS: {result['ticker']}",
"=" * 80,
"",
f"Total Trades: {result['trade_count']}",
f"With Prior Alerts: {result['trades_with_alerts']}",
f"Suspicious Count: {result['suspicious_count']}",
f"Average Timing Score: {result['avg_timing_score']}/100",
"",
]
if result.get('analyses'):
lines.append("📊 Recent Trades:")
for trade in result['analyses'][:10]:
emoji = "🚨" if trade.get('highly_suspicious') else "🔴" if trade['suspicious'] else "🟡" if trade['alert_count'] > 0 else ""
lines.append(
f" {emoji} {trade['official_name']:25s} {trade['side']:4s} on {trade['transaction_date']} "
f"(Score: {trade['timing_score']:.0f}/100)"
)
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
if __name__ == "__main__":
main()

View File

@ -3,8 +3,9 @@ Market monitoring module.
Real-time tracking of unusual market activity.
"""
from .market_monitor import MarketMonitor
from .alert_manager import AlertManager
from .disclosure_correlator import DisclosureCorrelator
from .market_monitor import MarketMonitor
__all__ = ["MarketMonitor", "AlertManager"]
__all__ = ["MarketMonitor", "AlertManager", "DisclosureCorrelator"]

View File

@ -0,0 +1,358 @@
"""
Disclosure correlation engine.
Matches congressional trade disclosures to prior market alerts.
Calculates timing advantage and suspicious activity scores.
"""
import logging
from datetime import date, timedelta, timezone
from decimal import Decimal
from typing import Any
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from pote.db.models import MarketAlert, Official, Security, Trade
logger = logging.getLogger(__name__)
class DisclosureCorrelator:
"""
Correlate congressional trades with prior market alerts.
Identifies suspicious timing patterns.
"""
def __init__(self, session: Session):
"""Initialize disclosure correlator."""
self.session = session
def get_alerts_before_trade(
self, trade: Trade, lookback_days: int = 30
) -> list[MarketAlert]:
"""
Get market alerts that occurred BEFORE a trade.
Args:
trade: Trade object
lookback_days: Days to look back before trade date
Returns:
List of MarketAlert objects
"""
if not trade.security or not trade.security.ticker:
return []
ticker = trade.security.ticker
start_date = trade.transaction_date - timedelta(days=lookback_days)
end_date = trade.transaction_date
# Convert dates to datetime for comparison
from datetime import datetime
start_dt = datetime.combine(start_date, datetime.min.time()).replace(
tzinfo=timezone.utc
)
end_dt = datetime.combine(end_date, datetime.max.time()).replace(
tzinfo=timezone.utc
)
alerts = (
self.session.query(MarketAlert)
.filter(
and_(
MarketAlert.ticker == ticker,
MarketAlert.timestamp >= start_dt,
MarketAlert.timestamp <= end_dt,
)
)
.order_by(MarketAlert.timestamp.desc())
.all()
)
return alerts
def calculate_timing_score(
self, trade: Trade, prior_alerts: list[MarketAlert]
) -> dict[str, Any]:
"""
Calculate timing advantage score for a trade.
Scoring factors:
- Number of prior alerts (more = more suspicious)
- Severity of alerts (higher = more suspicious)
- Recency (closer to trade = more suspicious)
- Alert types (some types more suspicious than others)
Args:
trade: Trade object
prior_alerts: List of alerts before trade
Returns:
Dict with timing analysis
"""
if not prior_alerts:
return {
"timing_score": 0,
"suspicious": False,
"reason": "No unusual market activity before trade",
"alert_count": 0,
}
# Calculate base score from alert count and severity
total_severity = sum(alert.severity or 0 for alert in prior_alerts)
avg_severity = total_severity / len(prior_alerts)
base_score = min(50, len(prior_alerts) * 5 + avg_severity * 2)
# Bonus for recent alerts (within 7 days)
recent_count = sum(
1
for alert in prior_alerts
if (trade.transaction_date - alert.timestamp.date()).days <= 7
)
recency_bonus = recent_count * 10
# Bonus for high-severity alerts
high_sev_count = sum(1 for alert in prior_alerts if (alert.severity or 0) >= 7)
severity_bonus = high_sev_count * 15
# Calculate final score (0-100)
timing_score = min(100, base_score + recency_bonus + severity_bonus)
# Determine suspicion level
suspicious = timing_score >= 60
highly_suspicious = timing_score >= 80
if highly_suspicious:
reason = (
f"Trade occurred after {len(prior_alerts)} alerts, "
f"including {high_sev_count} high-severity. "
f"High likelihood of timing advantage."
)
elif suspicious:
reason = (
f"Trade occurred after {len(prior_alerts)} alerts. "
f"Possible timing advantage."
)
else:
reason = (
f"Some unusual activity before trade ({len(prior_alerts)} alerts), "
f"but timing score is low."
)
return {
"timing_score": round(timing_score, 2),
"suspicious": suspicious,
"highly_suspicious": highly_suspicious,
"reason": reason,
"alert_count": len(prior_alerts),
"recent_alert_count": recent_count,
"high_severity_count": high_sev_count,
"avg_severity": round(avg_severity, 2),
"max_severity": max(alert.severity or 0 for alert in prior_alerts),
}
def analyze_trade(self, trade: Trade, lookback_days: int = 30) -> dict[str, Any]:
"""
Full analysis of a single trade.
Args:
trade: Trade object
lookback_days: Days to look back
Returns:
Complete analysis dict
"""
# Get prior alerts
prior_alerts = self.get_alerts_before_trade(trade, lookback_days)
# Calculate timing score
timing_analysis = self.calculate_timing_score(trade, prior_alerts)
# Build full analysis
analysis = {
"trade_id": trade.id,
"official_name": trade.official.name if trade.official else None,
"ticker": trade.security.ticker if trade.security else None,
"side": trade.side,
"transaction_date": str(trade.transaction_date),
"filing_date": str(trade.filing_date) if trade.filing_date else None,
"value_range": f"${float(trade.value_min):,.0f}"
+ (
f"-${float(trade.value_max):,.0f}"
if trade.value_max
else "+"
),
**timing_analysis,
"prior_alerts": [
{
"timestamp": str(alert.timestamp),
"alert_type": alert.alert_type,
"severity": alert.severity,
"days_before_trade": (
trade.transaction_date - alert.timestamp.date()
).days,
}
for alert in prior_alerts
],
}
return analysis
def analyze_recent_disclosures(
self, days: int = 7, min_timing_score: float = 50
) -> list[dict[str, Any]]:
"""
Analyze recently filed trades for suspicious timing.
Args:
days: Analyze trades filed in last N days
min_timing_score: Minimum timing score to include
Returns:
List of suspicious trade analyses
"""
# Get recent trades
since_date = date.today() - timedelta(days=days)
trades = (
self.session.query(Trade)
.filter(Trade.created_at >= since_date)
.join(Trade.official)
.join(Trade.security)
.all()
)
logger.info(f"Analyzing {len(trades)} trades filed in last {days} days")
suspicious_trades = []
for trade in trades:
analysis = self.analyze_trade(trade)
if analysis["timing_score"] >= min_timing_score:
suspicious_trades.append(analysis)
logger.info(
f"Found {len(suspicious_trades)} trades with timing score >= {min_timing_score}"
)
return sorted(
suspicious_trades, key=lambda x: x["timing_score"], reverse=True
)
def get_official_timing_pattern(
self, official_id: int, lookback_days: int = 365
) -> dict[str, Any]:
"""
Analyze an official's historical trading timing patterns.
Args:
official_id: Official ID
lookback_days: Days of history to analyze
Returns:
Pattern analysis dict
"""
since_date = date.today() - timedelta(days=lookback_days)
trades = (
self.session.query(Trade)
.filter(
and_(Trade.official_id == official_id, Trade.transaction_date >= since_date)
)
.join(Trade.security)
.all()
)
if not trades:
return {
"official_id": official_id,
"trade_count": 0,
"pattern": "No trades in period",
}
# Analyze each trade
analyses = []
for trade in trades:
analysis = self.analyze_trade(trade)
analyses.append(analysis)
# Calculate aggregate statistics
total_trades = len(analyses)
trades_with_alerts = sum(1 for a in analyses if a["alert_count"] > 0)
suspicious_trades = sum(1 for a in analyses if a["suspicious"])
highly_suspicious = sum(1 for a in analyses if a.get("highly_suspicious", False))
avg_timing_score = (
sum(a["timing_score"] for a in analyses) / total_trades
if total_trades > 0
else 0
)
# Determine pattern
if suspicious_trades / total_trades > 0.5:
pattern = "HIGHLY SUSPICIOUS - Majority of trades show timing advantage"
elif suspicious_trades / total_trades > 0.25:
pattern = "SUSPICIOUS - Significant portion of trades show timing advantage"
elif trades_with_alerts / total_trades > 0.5:
pattern = "NOTABLE - Many trades preceded by market alerts"
else:
pattern = "NORMAL - Typical trading pattern"
return {
"official_id": official_id,
"trade_count": total_trades,
"trades_with_prior_alerts": trades_with_alerts,
"suspicious_trade_count": suspicious_trades,
"highly_suspicious_count": highly_suspicious,
"avg_timing_score": round(avg_timing_score, 2),
"pattern": pattern,
"analyses": analyses,
}
def get_ticker_timing_analysis(
self, ticker: str, lookback_days: int = 365
) -> dict[str, Any]:
"""
Analyze timing patterns for a specific ticker.
Args:
ticker: Stock ticker
lookback_days: Days of history
Returns:
Ticker-specific timing analysis
"""
since_date = date.today() - timedelta(days=lookback_days)
trades = (
self.session.query(Trade)
.join(Trade.security)
.filter(
and_(Security.ticker == ticker, Trade.transaction_date >= since_date)
)
.join(Trade.official)
.all()
)
if not trades:
return {
"ticker": ticker,
"trade_count": 0,
"pattern": "No trades in period",
}
analyses = [self.analyze_trade(trade) for trade in trades]
return {
"ticker": ticker,
"trade_count": len(analyses),
"trades_with_alerts": sum(1 for a in analyses if a["alert_count"] > 0),
"suspicious_count": sum(1 for a in analyses if a["suspicious"]),
"avg_timing_score": round(
sum(a["timing_score"] for a in analyses) / len(analyses), 2
),
"analyses": sorted(analyses, key=lambda x: x["timing_score"], reverse=True),
}