Phase 1: Real-Time Market Monitoring System

COMPLETE: Real-time unusual activity detection for congressional tickers

New Database Model:
- MarketAlert: Stores unusual market activity alerts
  * Tracks volume spikes, price movements, volatility
  * JSON details field for flexible data storage
  * Severity scoring (1-10 scale)
  * Indexed for efficient queries by ticker/timestamp

New Modules:
- src/pote/monitoring/market_monitor.py: Core monitoring engine
  * get_congressional_watchlist(): Top 50 most-traded tickers
  * check_ticker(): Analyze single stock for unusual activity
  * scan_watchlist(): Batch analysis of multiple tickers
  * Detection logic:
    - Unusual volume (3x average)
    - Price spikes/drops (>5%)
    - High volatility (2x normal)
  * save_alerts(): Persist to database
  * get_recent_alerts(): Query historical alerts

- src/pote/monitoring/alert_manager.py: Alert formatting & filtering
  * format_alert_text(): Human-readable output
  * format_alert_html(): HTML email format
  * filter_alerts(): By severity, ticker, type
  * generate_summary_report(): Text/HTML reports

Scripts:
- scripts/monitor_market.py: CLI monitoring tool
  * Continuous monitoring mode (--interval)
  * One-time scan (--once)
  * Custom ticker lists or auto-detect congressional watchlist
  * Severity filtering (--min-severity)
  * Report generation and saving

Migrations:
- alembic/versions/f44014715b40_add_market_alerts_table.py

Documentation:
- docs/11_live_market_monitoring.md: Complete explanation
  * Why you can't track WHO is trading
  * What IS possible (timing analysis)
  * How hybrid monitoring works
  * Data sources and APIs

Usage:
  # Monitor congressional tickers (one-time scan)
  python scripts/monitor_market.py --once

  # Continuous monitoring (every 5 minutes)
  python scripts/monitor_market.py --interval 300

  # Monitor specific tickers
  python scripts/monitor_market.py --tickers NVDA,MSFT,AAPL --once

Next Steps (Phase 2):
- Disclosure correlation engine
- Timing advantage calculator
- Suspicious trade flagging
This commit is contained in:
ilia 2025-12-15 15:10:49 -05:00
parent 8ba9d7ffdd
commit cfaf38b0be
8 changed files with 1191 additions and 0 deletions

View File

@ -0,0 +1,32 @@
"""Add market_alerts table for real-time monitoring
Revision ID: 099810723175
Revises: 66fd166195e8
Create Date: 2025-12-15 15:07:22.605598
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '099810723175'
down_revision: Union[str, Sequence[str], None] = '66fd166195e8'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

View File

@ -0,0 +1,53 @@
"""Add market_alerts table
Revision ID: f44014715b40
Revises: 099810723175
Create Date: 2025-12-15 15:08:35.934280
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'f44014715b40'
down_revision: Union[str, Sequence[str], None] = '099810723175'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('market_alerts',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('ticker', sa.String(length=20), nullable=False),
sa.Column('alert_type', sa.String(length=50), nullable=False),
sa.Column('timestamp', sa.DateTime(), nullable=False),
sa.Column('details', sa.JSON(), nullable=True),
sa.Column('price', sa.DECIMAL(precision=15, scale=4), nullable=True),
sa.Column('volume', sa.Integer(), nullable=True),
sa.Column('change_pct', sa.DECIMAL(precision=10, scale=4), nullable=True),
sa.Column('severity', sa.Integer(), nullable=True),
sa.Column('source', sa.String(length=50), nullable=False),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index('ix_market_alerts_alert_type', 'market_alerts', ['alert_type'], unique=False)
op.create_index(op.f('ix_market_alerts_ticker'), 'market_alerts', ['ticker'], unique=False)
op.create_index('ix_market_alerts_ticker_timestamp', 'market_alerts', ['ticker', 'timestamp'], unique=False)
op.create_index(op.f('ix_market_alerts_timestamp'), 'market_alerts', ['timestamp'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_market_alerts_timestamp'), table_name='market_alerts')
op.drop_index('ix_market_alerts_ticker_timestamp', table_name='market_alerts')
op.drop_index(op.f('ix_market_alerts_ticker'), table_name='market_alerts')
op.drop_index('ix_market_alerts_alert_type', table_name='market_alerts')
op.drop_table('market_alerts')
# ### end Alembic commands ###

View File

@ -0,0 +1,407 @@
# Live Market Monitoring + Congressional Trading Analysis
## 🎯 What's Possible vs Impossible
### ❌ **NOT Possible:**
- Identify WHO is buying/selling in real-time
- Match live trades to specific Congress members
- See congressional trades before they're disclosed
### ✅ **IS Possible:**
- Track unusual market activity in real-time
- Monitor stocks Congress members historically trade
- Compare unusual activity to later disclosures
- Detect patterns (timing, sectors, etc.)
---
## 🔄 **Two-Phase Monitoring System**
### **Phase 1: Real-Time Market Monitoring**
Monitor unusual activity in stocks Congress trades:
- Unusual options flow
- Large block trades
- Volatility spikes
- Volume anomalies
### **Phase 2: Retroactive Analysis (30-45 days later)**
When disclosures come in:
- Match disclosed trades to earlier unusual activity
- Identify if Congress bought BEFORE or AFTER spikes
- Calculate timing advantage (if any)
- Build pattern database
---
## 📊 **Implementation: Watchlist-Based Monitoring**
### **Concept:**
```
Step 1: Congress Member Trades (Historical)
Nancy Pelosi often trades: NVDA, MSFT, GOOGL, AAPL
Dan Crenshaw often trades: XOM, CVX, LMT, BA
Step 2: Create Monitoring Watchlist
Monitor these tickers in real-time for:
- Unusual options activity
- Large block trades
- Price/volume anomalies
Step 3: When Disclosure Appears (30-45 days later)
Compare:
- Did they buy BEFORE unusual activity? (Suspicious)
- Did they buy AFTER? (Following market)
- What was the timing advantage?
```
---
## 🛠️ **Data Sources for Live Market Monitoring**
### **Free/Low-Cost Options:**
1. **Yahoo Finance (yfinance)**
- ✅ Real-time quotes (15-min delay free)
- ✅ Historical options data
- ✅ Volume data
- ❌ Not true real-time for options flow
2. **Unusual Whales API**
- ✅ Options flow data
- ✅ Unusual activity alerts
- 💰 Paid ($50-200/month)
- https://unusualwhales.com/
3. **Tradier API**
- ✅ Real-time market data
- ✅ Options chains
- 💰 Paid but affordable ($10-50/month)
- https://tradier.com/
4. **FlowAlgo**
- ✅ Options flow tracking
- ✅ Dark pool data
- 💰 Paid ($99-399/month)
- https://www.flowalgo.com/
5. **Polygon.io**
- ✅ Real-time stock data
- ✅ Options data
- 💰 Free tier + paid plans
- https://polygon.io/
### **Best Free Option: Build Your Own with yfinance**
Track volume/price changes every 5 minutes for congressional watchlist tickers.
---
## 💡 **Practical Hybrid System**
### **What We Can Build:**
```python
# Pseudo-code for hybrid monitoring
# 1. Get stocks Congress trades
congress_tickers = get_tickers_congress_trades()
# Result: ["NVDA", "MSFT", "TSLA", "AAPL", "SPY", ...]
# 2. Monitor these tickers for unusual activity
while market_open():
for ticker in congress_tickers:
current_data = get_realtime_data(ticker)
if is_unusual_activity(current_data):
log_alert({
"ticker": ticker,
"type": "unusual_volume", # or "price_spike", "options_flow"
"timestamp": now(),
"details": current_data
})
# 3. When disclosures appear (30-45 days later)
new_disclosures = fetch_congressional_trades()
for disclosure in new_disclosures:
# Check if we saw unusual activity BEFORE their trade
prior_alerts = get_alerts_before_date(
ticker=disclosure.ticker,
before_date=disclosure.transaction_date
)
if prior_alerts:
# They bought BEFORE unusual activity = Potential inside info
flag_suspicious(disclosure, prior_alerts)
else:
# They bought AFTER unusual activity = Following market
flag_following(disclosure)
```
---
## 📈 **Example: Nancy Pelosi NVDA Trade Analysis**
### **Timeline:**
```
Nov 10, 2024:
🔔 ALERT: NVDA unusual call options activity
Volume: 10x average
Strike: $500 (2 weeks out)
Nov 15, 2024:
💰 Someone buys NVDA (unknown who at the time)
Nov 18, 2024:
📰 NVDA announces new AI chip
📈 Stock jumps 15%
Dec 15, 2024:
📋 Disclosure: Nancy Pelosi bought NVDA on Nov 15
Value: $15,001-$50,000
ANALYSIS:
✅ She bought AFTER unusual options activity (Nov 10)
❓ She bought BEFORE announcement (Nov 18)
⏱️ Timing: 3 days before major news
🚩 Flag: Investigate if announcement was public knowledge
```
---
## 🎯 **Recommended Approach**
### **Phase 1: Build Congressional Ticker Watchlist**
```python
# scripts/build_ticker_watchlist.py
from pote.db import get_session
from pote.db.models import Trade, Security
from sqlalchemy import func
def get_most_traded_tickers(limit=50):
"""Get tickers Congress trades most frequently."""
session = next(get_session())
results = (
session.query(
Security.ticker,
func.count(Trade.id).label('trade_count')
)
.join(Trade)
.group_by(Security.ticker)
.order_by(func.count(Trade.id).desc())
.limit(limit)
.all()
)
return [r[0] for r in results]
# Result: Top 50 tickers Congress trades
# Use these for real-time monitoring
```
### **Phase 2: Real-Time Monitoring (Simple)**
```python
# scripts/monitor_congressional_tickers.py
import yfinance as yf
from datetime import datetime, timedelta
import time
def monitor_tickers(tickers, interval_minutes=5):
"""Monitor tickers for unusual activity."""
baseline = {} # Store baseline metrics
while True:
for ticker in tickers:
try:
stock = yf.Ticker(ticker)
current = stock.history(period="1d", interval="1m")
if len(current) > 0:
latest = current.iloc[-1]
# Check for unusual volume
avg_volume = current['Volume'].mean()
if latest['Volume'] > avg_volume * 3:
alert(f"🔔 {ticker}: Unusual volume spike!")
# Check for price movement
price_change = (latest['Close'] - current['Open'].iloc[0]) / current['Open'].iloc[0]
if abs(price_change) > 0.05: # 5% move
alert(f"📈 {ticker}: {price_change:.2%} move today!")
except Exception as e:
print(f"Error monitoring {ticker}: {e}")
time.sleep(interval_minutes * 60)
```
### **Phase 3: Retroactive Analysis**
When disclosures appear, analyze timing:
```python
# scripts/analyze_trade_timing.py
def analyze_disclosure_timing(disclosure):
"""
When a disclosure appears, check if there was unusual
activity BEFORE the trade date.
"""
# Get alerts from 7 days before trade
lookback_start = disclosure.transaction_date - timedelta(days=7)
lookback_end = disclosure.transaction_date
alerts = get_alerts_in_range(
ticker=disclosure.ticker,
start=lookback_start,
end=lookback_end
)
if alerts:
return {
"suspicious": True,
"reason": "Unusual activity before trade",
"alerts": alerts
}
# Check if trade was before major price movement
post_trade_price = get_price_change(
ticker=disclosure.ticker,
start=disclosure.transaction_date,
days=30
)
if post_trade_price > 0.10: # 10% gain
return {
"notable": True,
"reason": f"Stock up {post_trade_price:.1%} after trade",
"gain": post_trade_price
}
```
---
## 🚨 **Realistic Expectations**
### **What This System Will Do:**
✅ Monitor stocks Congress members historically trade
✅ Alert on unusual market activity in those stocks
✅ Retroactively correlate disclosures with earlier alerts
✅ Identify timing patterns and potential advantages
✅ Build database of congressional trading patterns
### **What This System WON'T Do:**
❌ Identify WHO is buying in real-time
❌ Give you advance notice of congressional trades
❌ Provide real-time inside information
❌ Allow you to "front-run" Congress
### **Legal & Ethical:**
✅ All data is public
✅ Analysis is retrospective
✅ For research and transparency
✅ Not market manipulation
❌ Cannot and should not be used to replicate potentially illegal trades
---
## 📊 **Proposed Implementation**
### **New Scripts to Create:**
1. **`scripts/build_congressional_watchlist.py`**
- Analyzes historical trades
- Identifies most-traded tickers
- Creates monitoring watchlist
2. **`scripts/monitor_market_live.py`**
- Monitors watchlist tickers
- Detects unusual activity
- Logs alerts to database
3. **`scripts/analyze_disclosure_timing.py`**
- When new disclosures appear
- Checks for prior unusual activity
- Flags suspicious timing
4. **`scripts/generate_timing_report.py`**
- Shows disclosures with unusual timing
- Calculates timing advantage
- Identifies patterns
### **New Database Tables:**
```sql
-- Track unusual market activity
CREATE TABLE market_alerts (
id SERIAL PRIMARY KEY,
ticker VARCHAR(20),
alert_type VARCHAR(50), -- 'unusual_volume', 'price_spike', 'options_flow'
timestamp TIMESTAMP,
details JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- Link disclosures to prior alerts
CREATE TABLE disclosure_timing_analysis (
id SERIAL PRIMARY KEY,
trade_id INTEGER REFERENCES trades(id),
suspicious_flag BOOLEAN,
timing_score DECIMAL(5,2), -- 0-100 score
prior_alerts JSONB,
post_trade_performance DECIMAL(10,4),
created_at TIMESTAMP DEFAULT NOW()
);
```
---
## 🎯 **Summary**
### **Your Question:**
> "Can we read live trades being made and compare them to a name?"
### **Answer:**
**No** - Live trades are anonymous, can't identify individuals
**BUT** - You CAN:
1. Monitor unusual activity in stocks Congress trades
2. Log these alerts in real-time
3. When disclosures appear (30-45 days later), correlate them
4. Identify if Congress bought BEFORE or AFTER unusual activity
5. Build patterns database of timing and performance
### **This Gives You:**
- ✅ Transparency on timing advantages
- ✅ Pattern detection across officials
- ✅ Research-grade analysis
- ✅ Historical correlation data
### **This Does NOT Give You:**
- ❌ Real-time identity of traders
- ❌ Advance notice of congressional trades
- ❌ Ability to "front-run" disclosures
---
## 🚀 **Would You Like Me To Build This?**
I can create:
1. ✅ Real-time monitoring system for congressional tickers
2. ✅ Alert logging and analysis
3. ✅ Timing correlation when disclosures appear
4. ✅ Pattern detection and reporting
This would be **Phase 2.5** of POTE - the "timing analysis" module.
**Should I proceed with implementation?**

116
scripts/monitor_market.py Executable file
View File

@ -0,0 +1,116 @@
#!/usr/bin/env python
"""
Real-time market monitoring for congressional tickers.
Run this continuously or on a schedule to detect unusual activity.
"""
import logging
import time
from datetime import datetime
from pathlib import Path
import click
from pote.db import get_session
from pote.monitoring.alert_manager import AlertManager
from pote.monitoring.market_monitor import MarketMonitor
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
@click.command()
@click.option("--tickers", help="Comma-separated list of tickers (default: congressional watchlist)")
@click.option("--interval", default=300, help="Scan interval in seconds (default: 300 = 5 minutes)")
@click.option("--once", is_flag=True, help="Run once and exit (no continuous monitoring)")
@click.option("--min-severity", default=5, help="Minimum severity to report (1-10)")
@click.option("--save-report", help="Save report to file")
@click.option("--lookback", default=5, help="Days of history to analyze (default: 5)")
def main(tickers, interval, once, min_severity, save_report, lookback):
"""Monitor market for unusual activity in congressional tickers."""
session = next(get_session())
monitor = MarketMonitor(session)
alert_mgr = AlertManager(session)
# Parse tickers if provided
ticker_list = None
if tickers:
ticker_list = [t.strip().upper() for t in tickers.split(",")]
logger.info(f"Monitoring {len(ticker_list)} specified tickers")
else:
logger.info("Monitoring congressional watchlist")
def run_scan():
"""Run a single scan."""
logger.info("=" * 80)
logger.info(f"Starting market scan at {datetime.now()}")
logger.info("=" * 80)
try:
# Scan for unusual activity
alerts = monitor.scan_watchlist(tickers=ticker_list, lookback_days=lookback)
if alerts:
logger.info(f"\n🔔 Found {len(alerts)} alerts!")
# Save to database
monitor.save_alerts(alerts)
# Get MarketAlert objects for reporting
from pote.db.models import MarketAlert
alert_objects = (
session.query(MarketAlert)
.order_by(MarketAlert.timestamp.desc())
.limit(len(alerts))
.all()
)
# Filter by severity
filtered = alert_mgr.filter_alerts(alert_objects, min_severity=min_severity)
if filtered:
# Generate report
report = alert_mgr.generate_summary_report(filtered, format="text")
print("\n" + report)
# Save report if requested
if save_report:
Path(save_report).write_text(report)
logger.info(f"Report saved to {save_report}")
else:
logger.info(f"No alerts above severity {min_severity}")
else:
logger.info("✅ No unusual activity detected")
except Exception as e:
logger.error(f"Error during scan: {e}", exc_info=True)
logger.info("=" * 80)
logger.info(f"Scan complete at {datetime.now()}")
logger.info("=" * 80)
# Run scan
run_scan()
# Continuous monitoring mode
if not once:
logger.info(f"\n🔄 Continuous monitoring enabled (interval: {interval}s)")
logger.info("Press Ctrl+C to stop\n")
try:
while True:
time.sleep(interval)
run_scan()
except KeyboardInterrupt:
logger.info("\n\n⏹️ Monitoring stopped by user")
else:
logger.info("\n✅ Single scan complete (use --interval for continuous monitoring)")
if __name__ == "__main__":
main()

View File

@ -13,6 +13,7 @@ from sqlalchemy import (
ForeignKey,
Index,
Integer,
JSON,
String,
Text,
UniqueConstraint,
@ -218,3 +219,50 @@ class MetricTrade(Base):
__table_args__ = (
UniqueConstraint("trade_id", "calc_date", "calc_version", name="uq_metrics_trade"),
)
class MarketAlert(Base):
"""
Real-time market activity alerts.
Tracks unusual volume, price movements, and other anomalies.
"""
__tablename__ = "market_alerts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ticker: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
alert_type: Mapped[str] = mapped_column(
String(50), nullable=False
) # 'unusual_volume', 'price_spike', 'options_flow', etc.
timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, index=True)
# Alert details (stored as JSON)
details: Mapped[dict | None] = mapped_column(JSON)
# Metrics at time of alert
price: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 4))
volume: Mapped[int | None] = mapped_column(Integer)
change_pct: Mapped[Decimal | None] = mapped_column(
DECIMAL(10, 4)
) # Price change %
# Severity scoring
severity: Mapped[int | None] = mapped_column(Integer) # 1-10 scale
# Metadata
source: Mapped[str] = mapped_column(String(50), default="market_monitor")
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
# Indexes for efficient queries
__table_args__ = (
Index("ix_market_alerts_ticker_timestamp", "ticker", "timestamp"),
Index("ix_market_alerts_alert_type", "alert_type"),
)
def __repr__(self) -> str:
return (
f"<MarketAlert(ticker='{self.ticker}', type='{self.alert_type}', "
f"timestamp={self.timestamp}, severity={self.severity})>"
)

View File

@ -0,0 +1,10 @@
"""
Market monitoring module.
Real-time tracking of unusual market activity.
"""
from .market_monitor import MarketMonitor
from .alert_manager import AlertManager
__all__ = ["MarketMonitor", "AlertManager"]

View File

@ -0,0 +1,244 @@
"""
Alert management and notification system.
Handles alert filtering, formatting, and delivery.
"""
import logging
from datetime import datetime, timezone
from typing import Any
from sqlalchemy.orm import Session
from pote.db.models import MarketAlert
logger = logging.getLogger(__name__)
class AlertManager:
"""Manage and deliver market alerts."""
def __init__(self, session: Session):
"""Initialize alert manager."""
self.session = session
def format_alert_text(self, alert: MarketAlert) -> str:
"""
Format alert as human-readable text.
Args:
alert: MarketAlert object
Returns:
Formatted alert string
"""
emoji_map = {
"unusual_volume": "📊",
"price_spike": "🚀",
"price_drop": "📉",
"high_volatility": "",
"options_flow": "💰",
}
emoji = emoji_map.get(alert.alert_type, "🔔")
severity_stars = "" * min(alert.severity or 1, 5)
lines = [
f"{emoji} {alert.ticker} - {alert.alert_type.upper().replace('_', ' ')}",
f" Severity: {severity_stars} ({alert.severity}/10)",
f" Time: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}",
f" Price: ${float(alert.price):.2f}" if alert.price else "",
f" Volume: {alert.volume:,}" if alert.volume else "",
f" Change: {float(alert.change_pct):+.2f}%" if alert.change_pct else "",
]
# Add details
if alert.details:
lines.append(" Details:")
for key, value in alert.details.items():
if isinstance(value, (int, float)):
if "pct" in key.lower() or "change" in key.lower():
lines.append(f" {key}: {value:+.2f}%")
else:
lines.append(f" {key}: {value:,.2f}")
else:
lines.append(f" {key}: {value}")
return "\n".join(line for line in lines if line)
def format_alert_html(self, alert: MarketAlert) -> str:
"""
Format alert as HTML.
Args:
alert: MarketAlert object
Returns:
HTML formatted alert
"""
severity_class = "high" if (alert.severity or 0) >= 7 else "medium" if (alert.severity or 0) >= 4 else "low"
html = f"""
<div class="alert {severity_class}">
<h3>{alert.ticker} - {alert.alert_type.replace('_', ' ').title()}</h3>
<p class="timestamp">{alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}</p>
<p class="severity">Severity: {alert.severity}/10</p>
<div class="metrics">
<span>Price: ${float(alert.price):.2f}</span>
<span>Volume: {alert.volume:,}</span>
<span>Change: {float(alert.change_pct):+.2f}%</span>
</div>
</div>
"""
return html
def filter_alerts(
self,
alerts: list[MarketAlert],
min_severity: int = 5,
tickers: list[str] | None = None,
alert_types: list[str] | None = None,
) -> list[MarketAlert]:
"""
Filter alerts by criteria.
Args:
alerts: List of alerts
min_severity: Minimum severity threshold
tickers: Only include these tickers (None = all)
alert_types: Only include these types (None = all)
Returns:
Filtered list of alerts
"""
filtered = alerts
# Filter by severity
filtered = [a for a in filtered if (a.severity or 0) >= min_severity]
# Filter by ticker
if tickers:
ticker_set = set(t.upper() for t in tickers)
filtered = [a for a in filtered if a.ticker.upper() in ticker_set]
# Filter by alert type
if alert_types:
type_set = set(alert_types)
filtered = [a for a in filtered if a.alert_type in type_set]
return filtered
def generate_summary_report(
self, alerts: list[MarketAlert], format: str = "text"
) -> str:
"""
Generate summary report of alerts.
Args:
alerts: List of alerts
format: Output format ('text' or 'html')
Returns:
Formatted summary report
"""
if format == "html":
return self._generate_html_summary(alerts)
else:
return self._generate_text_summary(alerts)
def _generate_text_summary(self, alerts: list[MarketAlert]) -> str:
"""Generate text summary report."""
if not alerts:
return "📭 No alerts to report."
lines = [
"=" * 80,
f" MARKET ACTIVITY ALERTS - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC",
f" {len(alerts)} Alerts",
"=" * 80,
"",
]
# Group by ticker
by_ticker: dict[str, list[MarketAlert]] = {}
for alert in alerts:
if alert.ticker not in by_ticker:
by_ticker[alert.ticker] = []
by_ticker[alert.ticker].append(alert)
# Sort tickers by max severity
sorted_tickers = sorted(
by_ticker.keys(),
key=lambda t: max((a.severity or 0) for a in by_ticker[t]),
reverse=True,
)
for ticker in sorted_tickers:
ticker_alerts = by_ticker[ticker]
max_sev = max((a.severity or 0) for a in ticker_alerts)
lines.append("" * 80)
lines.append(f"🎯 {ticker} - {len(ticker_alerts)} alerts (Max Severity: {max_sev}/10)")
lines.append("" * 80)
for alert in sorted(
ticker_alerts, key=lambda a: a.severity or 0, reverse=True
):
lines.append("")
lines.append(self.format_alert_text(alert))
lines.append("")
# Summary statistics
lines.append("=" * 80)
lines.append("📊 SUMMARY")
lines.append("=" * 80)
lines.append("")
lines.append(f"Total Alerts: {len(alerts)}")
lines.append(f"Unique Tickers: {len(by_ticker)}")
# Alert type breakdown
type_counts: dict[str, int] = {}
for alert in alerts:
type_counts[alert.alert_type] = type_counts.get(alert.alert_type, 0) + 1
lines.append("\nAlert Types:")
for alert_type, count in sorted(
type_counts.items(), key=lambda x: x[1], reverse=True
):
lines.append(f" {alert_type.replace('_', ' ').title():20s}: {count}")
# Top severity alerts
lines.append("\nTop 5 Highest Severity:")
top_alerts = sorted(alerts, key=lambda a: a.severity or 0, reverse=True)[:5]
for alert in top_alerts:
lines.append(
f" {alert.ticker:6s} - {alert.alert_type:20s} (Severity: {alert.severity}/10)"
)
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def _generate_html_summary(self, alerts: list[MarketAlert]) -> str:
"""Generate HTML summary report."""
html_parts = [
"<html><head><style>",
"body { font-family: Arial, sans-serif; }",
".alert { border: 1px solid #ddd; padding: 15px; margin: 10px 0; border-radius: 5px; }",
".alert.high { background-color: #ffebee; border-color: #f44336; }",
".alert.medium { background-color: #fff3e0; border-color: #ff9800; }",
".alert.low { background-color: #e8f5e9; border-color: #4caf50; }",
".timestamp { color: #666; font-size: 0.9em; }",
".metrics span { margin-right: 20px; }",
"</style></head><body>",
f"<h1>Market Activity Alerts</h1>",
f"<p><strong>{len(alerts)} Alerts</strong> | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC</p>",
]
for alert in sorted(alerts, key=lambda a: a.severity or 0, reverse=True):
html_parts.append(self.format_alert_html(alert))
html_parts.append("</body></html>")
return "\n".join(html_parts)

View File

@ -0,0 +1,281 @@
"""
Real-time market monitoring for congressional tickers.
Detects unusual activity: volume spikes, price movements, volatility.
"""
import logging
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from typing import Any
import yfinance as yf
from sqlalchemy.orm import Session
from pote.db.models import MarketAlert, Security, Trade
logger = logging.getLogger(__name__)
class MarketMonitor:
"""Monitor stocks for unusual market activity."""
def __init__(self, session: Session):
"""Initialize market monitor."""
self.session = session
def get_congressional_watchlist(self, limit: int = 50) -> list[str]:
"""
Get list of most-traded tickers by Congress.
Args:
limit: Maximum number of tickers to return
Returns:
List of ticker symbols
"""
from sqlalchemy import func
result = (
self.session.query(Security.ticker, func.count(Trade.id).label("count"))
.join(Trade)
.group_by(Security.ticker)
.order_by(func.count(Trade.id).desc())
.limit(limit)
.all()
)
tickers = [r[0] for r in result]
logger.info(f"Built watchlist of {len(tickers)} tickers from congressional trades")
return tickers
def check_ticker(self, ticker: str, lookback_days: int = 5) -> list[dict[str, Any]]:
"""
Check a single ticker for unusual activity.
Args:
ticker: Stock ticker symbol
lookback_days: Days of history to analyze
Returns:
List of alerts detected
"""
alerts = []
try:
stock = yf.Ticker(ticker)
# Get recent history
hist = stock.history(period=f"{lookback_days}d", interval="1d")
if len(hist) < 2:
logger.warning(f"Insufficient data for {ticker}")
return alerts
# Calculate baseline metrics
avg_volume = hist["Volume"].mean()
avg_price_change = hist["Close"].pct_change().abs().mean()
# Get latest data
latest = hist.iloc[-1]
prev = hist.iloc[-2]
current_volume = latest["Volume"]
current_price = latest["Close"]
price_change = (current_price - prev["Close"]) / prev["Close"]
# Check for unusual volume (3x average)
if current_volume > avg_volume * 3 and avg_volume > 0:
severity = min(10, int((current_volume / avg_volume) - 2))
alerts.append(
{
"ticker": ticker,
"alert_type": "unusual_volume",
"timestamp": datetime.now(timezone.utc),
"details": {
"current_volume": int(current_volume),
"avg_volume": int(avg_volume),
"multiplier": round(current_volume / avg_volume, 2),
},
"price": Decimal(str(current_price)),
"volume": int(current_volume),
"change_pct": Decimal(str(price_change * 100)),
"severity": severity,
}
)
# Check for significant price movement (>5%)
if abs(price_change) > 0.05:
severity = min(10, int(abs(price_change) * 100 / 2))
alerts.append(
{
"ticker": ticker,
"alert_type": "price_spike"
if price_change > 0
else "price_drop",
"timestamp": datetime.now(timezone.utc),
"details": {
"current_price": float(current_price),
"prev_price": float(prev["Close"]),
"change_pct": round(price_change * 100, 2),
},
"price": Decimal(str(current_price)),
"volume": int(current_volume),
"change_pct": Decimal(str(price_change * 100)),
"severity": severity,
}
)
# Check for unusual volatility (price swings)
if len(hist) >= 5:
recent_volatility = hist["Close"].iloc[-5:].pct_change().abs().mean()
if recent_volatility > avg_price_change * 2 and avg_price_change > 0:
severity = min(
10, int((recent_volatility / avg_price_change) - 1)
)
alerts.append(
{
"ticker": ticker,
"alert_type": "high_volatility",
"timestamp": datetime.now(timezone.utc),
"details": {
"recent_volatility": round(recent_volatility * 100, 2),
"avg_volatility": round(avg_price_change * 100, 2),
"multiplier": round(recent_volatility / avg_price_change, 2),
},
"price": Decimal(str(current_price)),
"volume": int(current_volume),
"change_pct": Decimal(str(price_change * 100)),
"severity": severity,
}
)
except Exception as e:
logger.error(f"Error checking {ticker}: {e}")
return alerts
def scan_watchlist(
self, tickers: list[str] | None = None, lookback_days: int = 5
) -> list[dict[str, Any]]:
"""
Scan multiple tickers for unusual activity.
Args:
tickers: List of tickers to scan (None = use congressional watchlist)
lookback_days: Days of history to analyze
Returns:
List of all alerts detected
"""
if tickers is None:
tickers = self.get_congressional_watchlist()
all_alerts = []
logger.info(f"Scanning {len(tickers)} tickers for unusual activity...")
for ticker in tickers:
alerts = self.check_ticker(ticker, lookback_days=lookback_days)
all_alerts.extend(alerts)
if alerts:
logger.info(
f"🔔 {ticker}: {len(alerts)} alerts - "
+ ", ".join(a["alert_type"] for a in alerts)
)
logger.info(f"Scan complete. Found {len(all_alerts)} total alerts.")
return all_alerts
def save_alerts(self, alerts: list[dict[str, Any]]) -> int:
"""
Save alerts to database.
Args:
alerts: List of alert dictionaries
Returns:
Number of alerts saved
"""
saved = 0
for alert_data in alerts:
alert = MarketAlert(**alert_data)
self.session.add(alert)
saved += 1
self.session.commit()
logger.info(f"Saved {saved} alerts to database")
return saved
def get_recent_alerts(
self,
ticker: str | None = None,
days: int = 7,
alert_type: str | None = None,
min_severity: int = 0,
) -> list[MarketAlert]:
"""
Query recent alerts from database.
Args:
ticker: Filter by ticker (None = all)
days: Look back this many days
alert_type: Filter by alert type (None = all)
min_severity: Minimum severity level
Returns:
List of MarketAlert objects
"""
since = datetime.now(timezone.utc) - timedelta(days=days)
query = self.session.query(MarketAlert).filter(MarketAlert.timestamp >= since)
if ticker:
query = query.filter(MarketAlert.ticker == ticker)
if alert_type:
query = query.filter(MarketAlert.alert_type == alert_type)
if min_severity > 0:
query = query.filter(MarketAlert.severity >= min_severity)
return query.order_by(MarketAlert.timestamp.desc()).all()
def get_ticker_alert_summary(self, days: int = 30) -> dict[str, dict]:
"""
Get summary of alerts by ticker.
Args:
days: Look back this many days
Returns:
Dict mapping ticker to alert summary
"""
since = datetime.now(timezone.utc) - timedelta(days=days)
from sqlalchemy import func
results = (
self.session.query(
MarketAlert.ticker,
func.count(MarketAlert.id).label("alert_count"),
func.avg(MarketAlert.severity).label("avg_severity"),
func.max(MarketAlert.severity).label("max_severity"),
)
.filter(MarketAlert.timestamp >= since)
.group_by(MarketAlert.ticker)
.order_by(func.count(MarketAlert.id).desc())
.all()
)
summary = {}
for r in results:
summary[r[0]] = {
"alert_count": r[1],
"avg_severity": round(float(r[2]), 2) if r[2] else 0,
"max_severity": r[3],
}
return summary