From cfaf38b0be0b004727a2359b761cd6876d16e29c Mon Sep 17 00:00:00 2001 From: ilia Date: Mon, 15 Dec 2025 15:10:49 -0500 Subject: [PATCH] Phase 1: Real-Time Market Monitoring System COMPLETE: Real-time unusual activity detection for congressional tickers New Database Model: - MarketAlert: Stores unusual market activity alerts * Tracks volume spikes, price movements, volatility * JSON details field for flexible data storage * Severity scoring (1-10 scale) * Indexed for efficient queries by ticker/timestamp New Modules: - src/pote/monitoring/market_monitor.py: Core monitoring engine * get_congressional_watchlist(): Top 50 most-traded tickers * check_ticker(): Analyze single stock for unusual activity * scan_watchlist(): Batch analysis of multiple tickers * Detection logic: - Unusual volume (3x average) - Price spikes/drops (>5%) - High volatility (2x normal) * save_alerts(): Persist to database * get_recent_alerts(): Query historical alerts - src/pote/monitoring/alert_manager.py: Alert formatting & filtering * format_alert_text(): Human-readable output * format_alert_html(): HTML email format * filter_alerts(): By severity, ticker, type * generate_summary_report(): Text/HTML reports Scripts: - scripts/monitor_market.py: CLI monitoring tool * Continuous monitoring mode (--interval) * One-time scan (--once) * Custom ticker lists or auto-detect congressional watchlist * Severity filtering (--min-severity) * Report generation and saving Migrations: - alembic/versions/f44014715b40_add_market_alerts_table.py Documentation: - docs/11_live_market_monitoring.md: Complete explanation * Why you can't track WHO is trading * What IS possible (timing analysis) * How hybrid monitoring works * Data sources and APIs Usage: # Monitor congressional tickers (one-time scan) python scripts/monitor_market.py --once # Continuous monitoring (every 5 minutes) python scripts/monitor_market.py --interval 300 # Monitor specific tickers python scripts/monitor_market.py --tickers NVDA,MSFT,AAPL --once Next Steps (Phase 2): - Disclosure correlation engine - Timing advantage calculator - Suspicious trade flagging --- ..._add_market_alerts_table_for_real_time_.py | 32 ++ .../f44014715b40_add_market_alerts_table.py | 53 +++ docs/11_live_market_monitoring.md | 407 ++++++++++++++++++ scripts/monitor_market.py | 116 +++++ src/pote/db/models.py | 48 +++ src/pote/monitoring/__init__.py | 10 + src/pote/monitoring/alert_manager.py | 244 +++++++++++ src/pote/monitoring/market_monitor.py | 281 ++++++++++++ 8 files changed, 1191 insertions(+) create mode 100644 alembic/versions/099810723175_add_market_alerts_table_for_real_time_.py create mode 100644 alembic/versions/f44014715b40_add_market_alerts_table.py create mode 100644 docs/11_live_market_monitoring.md create mode 100755 scripts/monitor_market.py create mode 100644 src/pote/monitoring/__init__.py create mode 100644 src/pote/monitoring/alert_manager.py create mode 100644 src/pote/monitoring/market_monitor.py diff --git a/alembic/versions/099810723175_add_market_alerts_table_for_real_time_.py b/alembic/versions/099810723175_add_market_alerts_table_for_real_time_.py new file mode 100644 index 0000000..a8f965e --- /dev/null +++ b/alembic/versions/099810723175_add_market_alerts_table_for_real_time_.py @@ -0,0 +1,32 @@ +"""Add market_alerts table for real-time monitoring + +Revision ID: 099810723175 +Revises: 66fd166195e8 +Create Date: 2025-12-15 15:07:22.605598 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '099810723175' +down_revision: Union[str, Sequence[str], None] = '66fd166195e8' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + pass + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + pass + # ### end Alembic commands ### diff --git a/alembic/versions/f44014715b40_add_market_alerts_table.py b/alembic/versions/f44014715b40_add_market_alerts_table.py new file mode 100644 index 0000000..c69d4a5 --- /dev/null +++ b/alembic/versions/f44014715b40_add_market_alerts_table.py @@ -0,0 +1,53 @@ +"""Add market_alerts table + +Revision ID: f44014715b40 +Revises: 099810723175 +Create Date: 2025-12-15 15:08:35.934280 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'f44014715b40' +down_revision: Union[str, Sequence[str], None] = '099810723175' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('market_alerts', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('ticker', sa.String(length=20), nullable=False), + sa.Column('alert_type', sa.String(length=50), nullable=False), + sa.Column('timestamp', sa.DateTime(), nullable=False), + sa.Column('details', sa.JSON(), nullable=True), + sa.Column('price', sa.DECIMAL(precision=15, scale=4), nullable=True), + sa.Column('volume', sa.Integer(), nullable=True), + sa.Column('change_pct', sa.DECIMAL(precision=10, scale=4), nullable=True), + sa.Column('severity', sa.Integer(), nullable=True), + sa.Column('source', sa.String(length=50), nullable=False), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('ix_market_alerts_alert_type', 'market_alerts', ['alert_type'], unique=False) + op.create_index(op.f('ix_market_alerts_ticker'), 'market_alerts', ['ticker'], unique=False) + op.create_index('ix_market_alerts_ticker_timestamp', 'market_alerts', ['ticker', 'timestamp'], unique=False) + op.create_index(op.f('ix_market_alerts_timestamp'), 'market_alerts', ['timestamp'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_market_alerts_timestamp'), table_name='market_alerts') + op.drop_index('ix_market_alerts_ticker_timestamp', table_name='market_alerts') + op.drop_index(op.f('ix_market_alerts_ticker'), table_name='market_alerts') + op.drop_index('ix_market_alerts_alert_type', table_name='market_alerts') + op.drop_table('market_alerts') + # ### end Alembic commands ### diff --git a/docs/11_live_market_monitoring.md b/docs/11_live_market_monitoring.md new file mode 100644 index 0000000..f4bf494 --- /dev/null +++ b/docs/11_live_market_monitoring.md @@ -0,0 +1,407 @@ +# Live Market Monitoring + Congressional Trading Analysis + +## šŸŽÆ What's Possible vs Impossible + +### āŒ **NOT Possible:** +- Identify WHO is buying/selling in real-time +- Match live trades to specific Congress members +- See congressional trades before they're disclosed + +### āœ… **IS Possible:** +- Track unusual market activity in real-time +- Monitor stocks Congress members historically trade +- Compare unusual activity to later disclosures +- Detect patterns (timing, sectors, etc.) + +--- + +## šŸ”„ **Two-Phase Monitoring System** + +### **Phase 1: Real-Time Market Monitoring** +Monitor unusual activity in stocks Congress trades: +- Unusual options flow +- Large block trades +- Volatility spikes +- Volume anomalies + +### **Phase 2: Retroactive Analysis (30-45 days later)** +When disclosures come in: +- Match disclosed trades to earlier unusual activity +- Identify if Congress bought BEFORE or AFTER spikes +- Calculate timing advantage (if any) +- Build pattern database + +--- + +## šŸ“Š **Implementation: Watchlist-Based Monitoring** + +### **Concept:** + +``` +Step 1: Congress Member Trades (Historical) + Nancy Pelosi often trades: NVDA, MSFT, GOOGL, AAPL + Dan Crenshaw often trades: XOM, CVX, LMT, BA + +Step 2: Create Monitoring Watchlist + Monitor these tickers in real-time for: + - Unusual options activity + - Large block trades + - Price/volume anomalies + +Step 3: When Disclosure Appears (30-45 days later) + Compare: + - Did they buy BEFORE unusual activity? (Suspicious) + - Did they buy AFTER? (Following market) + - What was the timing advantage? +``` + +--- + +## šŸ› ļø **Data Sources for Live Market Monitoring** + +### **Free/Low-Cost Options:** + +1. **Yahoo Finance (yfinance)** + - āœ… Real-time quotes (15-min delay free) + - āœ… Historical options data + - āœ… Volume data + - āŒ Not true real-time for options flow + +2. **Unusual Whales API** + - āœ… Options flow data + - āœ… Unusual activity alerts + - šŸ’° Paid ($50-200/month) + - https://unusualwhales.com/ + +3. **Tradier API** + - āœ… Real-time market data + - āœ… Options chains + - šŸ’° Paid but affordable ($10-50/month) + - https://tradier.com/ + +4. **FlowAlgo** + - āœ… Options flow tracking + - āœ… Dark pool data + - šŸ’° Paid ($99-399/month) + - https://www.flowalgo.com/ + +5. **Polygon.io** + - āœ… Real-time stock data + - āœ… Options data + - šŸ’° Free tier + paid plans + - https://polygon.io/ + +### **Best Free Option: Build Your Own with yfinance** + +Track volume/price changes every 5 minutes for congressional watchlist tickers. + +--- + +## šŸ’” **Practical Hybrid System** + +### **What We Can Build:** + +```python +# Pseudo-code for hybrid monitoring + +# 1. Get stocks Congress trades +congress_tickers = get_tickers_congress_trades() +# Result: ["NVDA", "MSFT", "TSLA", "AAPL", "SPY", ...] + +# 2. Monitor these tickers for unusual activity +while market_open(): + for ticker in congress_tickers: + current_data = get_realtime_data(ticker) + + if is_unusual_activity(current_data): + log_alert({ + "ticker": ticker, + "type": "unusual_volume", # or "price_spike", "options_flow" + "timestamp": now(), + "details": current_data + }) + +# 3. When disclosures appear (30-45 days later) +new_disclosures = fetch_congressional_trades() + +for disclosure in new_disclosures: + # Check if we saw unusual activity BEFORE their trade + prior_alerts = get_alerts_before_date( + ticker=disclosure.ticker, + before_date=disclosure.transaction_date + ) + + if prior_alerts: + # They bought BEFORE unusual activity = Potential inside info + flag_suspicious(disclosure, prior_alerts) + else: + # They bought AFTER unusual activity = Following market + flag_following(disclosure) +``` + +--- + +## šŸ“ˆ **Example: Nancy Pelosi NVDA Trade Analysis** + +### **Timeline:** + +``` +Nov 10, 2024: + šŸ”” ALERT: NVDA unusual call options activity + Volume: 10x average + Strike: $500 (2 weeks out) + +Nov 15, 2024: + šŸ’° Someone buys NVDA (unknown who at the time) + +Nov 18, 2024: + šŸ“° NVDA announces new AI chip + šŸ“ˆ Stock jumps 15% + +Dec 15, 2024: + šŸ“‹ Disclosure: Nancy Pelosi bought NVDA on Nov 15 + Value: $15,001-$50,000 + +ANALYSIS: + āœ… She bought AFTER unusual options activity (Nov 10) + ā“ She bought BEFORE announcement (Nov 18) + ā±ļø Timing: 3 days before major news + 🚩 Flag: Investigate if announcement was public knowledge +``` + +--- + +## šŸŽÆ **Recommended Approach** + +### **Phase 1: Build Congressional Ticker Watchlist** + +```python +# scripts/build_ticker_watchlist.py + +from pote.db import get_session +from pote.db.models import Trade, Security +from sqlalchemy import func + +def get_most_traded_tickers(limit=50): + """Get tickers Congress trades most frequently.""" + session = next(get_session()) + + results = ( + session.query( + Security.ticker, + func.count(Trade.id).label('trade_count') + ) + .join(Trade) + .group_by(Security.ticker) + .order_by(func.count(Trade.id).desc()) + .limit(limit) + .all() + ) + + return [r[0] for r in results] + +# Result: Top 50 tickers Congress trades +# Use these for real-time monitoring +``` + +### **Phase 2: Real-Time Monitoring (Simple)** + +```python +# scripts/monitor_congressional_tickers.py + +import yfinance as yf +from datetime import datetime, timedelta +import time + +def monitor_tickers(tickers, interval_minutes=5): + """Monitor tickers for unusual activity.""" + + baseline = {} # Store baseline metrics + + while True: + for ticker in tickers: + try: + stock = yf.Ticker(ticker) + current = stock.history(period="1d", interval="1m") + + if len(current) > 0: + latest = current.iloc[-1] + + # Check for unusual volume + avg_volume = current['Volume'].mean() + if latest['Volume'] > avg_volume * 3: + alert(f"šŸ”” {ticker}: Unusual volume spike!") + + # Check for price movement + price_change = (latest['Close'] - current['Open'].iloc[0]) / current['Open'].iloc[0] + if abs(price_change) > 0.05: # 5% move + alert(f"šŸ“ˆ {ticker}: {price_change:.2%} move today!") + + except Exception as e: + print(f"Error monitoring {ticker}: {e}") + + time.sleep(interval_minutes * 60) +``` + +### **Phase 3: Retroactive Analysis** + +When disclosures appear, analyze timing: + +```python +# scripts/analyze_trade_timing.py + +def analyze_disclosure_timing(disclosure): + """ + When a disclosure appears, check if there was unusual + activity BEFORE the trade date. + """ + + # Get alerts from 7 days before trade + lookback_start = disclosure.transaction_date - timedelta(days=7) + lookback_end = disclosure.transaction_date + + alerts = get_alerts_in_range( + ticker=disclosure.ticker, + start=lookback_start, + end=lookback_end + ) + + if alerts: + return { + "suspicious": True, + "reason": "Unusual activity before trade", + "alerts": alerts + } + + # Check if trade was before major price movement + post_trade_price = get_price_change( + ticker=disclosure.ticker, + start=disclosure.transaction_date, + days=30 + ) + + if post_trade_price > 0.10: # 10% gain + return { + "notable": True, + "reason": f"Stock up {post_trade_price:.1%} after trade", + "gain": post_trade_price + } +``` + +--- + +## 🚨 **Realistic Expectations** + +### **What This System Will Do:** +āœ… Monitor stocks Congress members historically trade +āœ… Alert on unusual market activity in those stocks +āœ… Retroactively correlate disclosures with earlier alerts +āœ… Identify timing patterns and potential advantages +āœ… Build database of congressional trading patterns + +### **What This System WON'T Do:** +āŒ Identify WHO is buying in real-time +āŒ Give you advance notice of congressional trades +āŒ Provide real-time inside information +āŒ Allow you to "front-run" Congress + +### **Legal & Ethical:** +āœ… All data is public +āœ… Analysis is retrospective +āœ… For research and transparency +āœ… Not market manipulation +āŒ Cannot and should not be used to replicate potentially illegal trades + +--- + +## šŸ“Š **Proposed Implementation** + +### **New Scripts to Create:** + +1. **`scripts/build_congressional_watchlist.py`** + - Analyzes historical trades + - Identifies most-traded tickers + - Creates monitoring watchlist + +2. **`scripts/monitor_market_live.py`** + - Monitors watchlist tickers + - Detects unusual activity + - Logs alerts to database + +3. **`scripts/analyze_disclosure_timing.py`** + - When new disclosures appear + - Checks for prior unusual activity + - Flags suspicious timing + +4. **`scripts/generate_timing_report.py`** + - Shows disclosures with unusual timing + - Calculates timing advantage + - Identifies patterns + +### **New Database Tables:** + +```sql +-- Track unusual market activity +CREATE TABLE market_alerts ( + id SERIAL PRIMARY KEY, + ticker VARCHAR(20), + alert_type VARCHAR(50), -- 'unusual_volume', 'price_spike', 'options_flow' + timestamp TIMESTAMP, + details JSONB, + created_at TIMESTAMP DEFAULT NOW() +); + +-- Link disclosures to prior alerts +CREATE TABLE disclosure_timing_analysis ( + id SERIAL PRIMARY KEY, + trade_id INTEGER REFERENCES trades(id), + suspicious_flag BOOLEAN, + timing_score DECIMAL(5,2), -- 0-100 score + prior_alerts JSONB, + post_trade_performance DECIMAL(10,4), + created_at TIMESTAMP DEFAULT NOW() +); +``` + +--- + +## šŸŽÆ **Summary** + +### **Your Question:** +> "Can we read live trades being made and compare them to a name?" + +### **Answer:** +āŒ **No** - Live trades are anonymous, can't identify individuals + +āœ… **BUT** - You CAN: +1. Monitor unusual activity in stocks Congress trades +2. Log these alerts in real-time +3. When disclosures appear (30-45 days later), correlate them +4. Identify if Congress bought BEFORE or AFTER unusual activity +5. Build patterns database of timing and performance + +### **This Gives You:** +- āœ… Transparency on timing advantages +- āœ… Pattern detection across officials +- āœ… Research-grade analysis +- āœ… Historical correlation data + +### **This Does NOT Give You:** +- āŒ Real-time identity of traders +- āŒ Advance notice of congressional trades +- āŒ Ability to "front-run" disclosures + +--- + +## šŸš€ **Would You Like Me To Build This?** + +I can create: +1. āœ… Real-time monitoring system for congressional tickers +2. āœ… Alert logging and analysis +3. āœ… Timing correlation when disclosures appear +4. āœ… Pattern detection and reporting + +This would be **Phase 2.5** of POTE - the "timing analysis" module. + +**Should I proceed with implementation?** + diff --git a/scripts/monitor_market.py b/scripts/monitor_market.py new file mode 100755 index 0000000..fbdd286 --- /dev/null +++ b/scripts/monitor_market.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +""" +Real-time market monitoring for congressional tickers. +Run this continuously or on a schedule to detect unusual activity. +""" + +import logging +import time +from datetime import datetime +from pathlib import Path + +import click + +from pote.db import get_session +from pote.monitoring.alert_manager import AlertManager +from pote.monitoring.market_monitor import MarketMonitor + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +@click.command() +@click.option("--tickers", help="Comma-separated list of tickers (default: congressional watchlist)") +@click.option("--interval", default=300, help="Scan interval in seconds (default: 300 = 5 minutes)") +@click.option("--once", is_flag=True, help="Run once and exit (no continuous monitoring)") +@click.option("--min-severity", default=5, help="Minimum severity to report (1-10)") +@click.option("--save-report", help="Save report to file") +@click.option("--lookback", default=5, help="Days of history to analyze (default: 5)") +def main(tickers, interval, once, min_severity, save_report, lookback): + """Monitor market for unusual activity in congressional tickers.""" + + session = next(get_session()) + monitor = MarketMonitor(session) + alert_mgr = AlertManager(session) + + # Parse tickers if provided + ticker_list = None + if tickers: + ticker_list = [t.strip().upper() for t in tickers.split(",")] + logger.info(f"Monitoring {len(ticker_list)} specified tickers") + else: + logger.info("Monitoring congressional watchlist") + + def run_scan(): + """Run a single scan.""" + logger.info("=" * 80) + logger.info(f"Starting market scan at {datetime.now()}") + logger.info("=" * 80) + + try: + # Scan for unusual activity + alerts = monitor.scan_watchlist(tickers=ticker_list, lookback_days=lookback) + + if alerts: + logger.info(f"\nšŸ”” Found {len(alerts)} alerts!") + + # Save to database + monitor.save_alerts(alerts) + + # Get MarketAlert objects for reporting + from pote.db.models import MarketAlert + + alert_objects = ( + session.query(MarketAlert) + .order_by(MarketAlert.timestamp.desc()) + .limit(len(alerts)) + .all() + ) + + # Filter by severity + filtered = alert_mgr.filter_alerts(alert_objects, min_severity=min_severity) + + if filtered: + # Generate report + report = alert_mgr.generate_summary_report(filtered, format="text") + print("\n" + report) + + # Save report if requested + if save_report: + Path(save_report).write_text(report) + logger.info(f"Report saved to {save_report}") + else: + logger.info(f"No alerts above severity {min_severity}") + else: + logger.info("āœ… No unusual activity detected") + + except Exception as e: + logger.error(f"Error during scan: {e}", exc_info=True) + + logger.info("=" * 80) + logger.info(f"Scan complete at {datetime.now()}") + logger.info("=" * 80) + + # Run scan + run_scan() + + # Continuous monitoring mode + if not once: + logger.info(f"\nšŸ”„ Continuous monitoring enabled (interval: {interval}s)") + logger.info("Press Ctrl+C to stop\n") + + try: + while True: + time.sleep(interval) + run_scan() + except KeyboardInterrupt: + logger.info("\n\nā¹ļø Monitoring stopped by user") + else: + logger.info("\nāœ… Single scan complete (use --interval for continuous monitoring)") + + +if __name__ == "__main__": + main() + diff --git a/src/pote/db/models.py b/src/pote/db/models.py index c381de6..03739c0 100644 --- a/src/pote/db/models.py +++ b/src/pote/db/models.py @@ -13,6 +13,7 @@ from sqlalchemy import ( ForeignKey, Index, Integer, + JSON, String, Text, UniqueConstraint, @@ -218,3 +219,50 @@ class MetricTrade(Base): __table_args__ = ( UniqueConstraint("trade_id", "calc_date", "calc_version", name="uq_metrics_trade"), ) + + +class MarketAlert(Base): + """ + Real-time market activity alerts. + Tracks unusual volume, price movements, and other anomalies. + """ + + __tablename__ = "market_alerts" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + ticker: Mapped[str] = mapped_column(String(20), nullable=False, index=True) + alert_type: Mapped[str] = mapped_column( + String(50), nullable=False + ) # 'unusual_volume', 'price_spike', 'options_flow', etc. + timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, index=True) + + # Alert details (stored as JSON) + details: Mapped[dict | None] = mapped_column(JSON) + + # Metrics at time of alert + price: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 4)) + volume: Mapped[int | None] = mapped_column(Integer) + change_pct: Mapped[Decimal | None] = mapped_column( + DECIMAL(10, 4) + ) # Price change % + + # Severity scoring + severity: Mapped[int | None] = mapped_column(Integer) # 1-10 scale + + # Metadata + source: Mapped[str] = mapped_column(String(50), default="market_monitor") + created_at: Mapped[datetime] = mapped_column( + DateTime, default=lambda: datetime.now(timezone.utc) + ) + + # Indexes for efficient queries + __table_args__ = ( + Index("ix_market_alerts_ticker_timestamp", "ticker", "timestamp"), + Index("ix_market_alerts_alert_type", "alert_type"), + ) + + def __repr__(self) -> str: + return ( + f"" + ) diff --git a/src/pote/monitoring/__init__.py b/src/pote/monitoring/__init__.py new file mode 100644 index 0000000..08e91a7 --- /dev/null +++ b/src/pote/monitoring/__init__.py @@ -0,0 +1,10 @@ +""" +Market monitoring module. +Real-time tracking of unusual market activity. +""" + +from .market_monitor import MarketMonitor +from .alert_manager import AlertManager + +__all__ = ["MarketMonitor", "AlertManager"] + diff --git a/src/pote/monitoring/alert_manager.py b/src/pote/monitoring/alert_manager.py new file mode 100644 index 0000000..18fa003 --- /dev/null +++ b/src/pote/monitoring/alert_manager.py @@ -0,0 +1,244 @@ +""" +Alert management and notification system. +Handles alert filtering, formatting, and delivery. +""" + +import logging +from datetime import datetime, timezone +from typing import Any + +from sqlalchemy.orm import Session + +from pote.db.models import MarketAlert + +logger = logging.getLogger(__name__) + + +class AlertManager: + """Manage and deliver market alerts.""" + + def __init__(self, session: Session): + """Initialize alert manager.""" + self.session = session + + def format_alert_text(self, alert: MarketAlert) -> str: + """ + Format alert as human-readable text. + + Args: + alert: MarketAlert object + + Returns: + Formatted alert string + """ + emoji_map = { + "unusual_volume": "šŸ“Š", + "price_spike": "šŸš€", + "price_drop": "šŸ“‰", + "high_volatility": "⚔", + "options_flow": "šŸ’°", + } + + emoji = emoji_map.get(alert.alert_type, "šŸ””") + severity_stars = "⭐" * min(alert.severity or 1, 5) + + lines = [ + f"{emoji} {alert.ticker} - {alert.alert_type.upper().replace('_', ' ')}", + f" Severity: {severity_stars} ({alert.severity}/10)", + f" Time: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}", + f" Price: ${float(alert.price):.2f}" if alert.price else "", + f" Volume: {alert.volume:,}" if alert.volume else "", + f" Change: {float(alert.change_pct):+.2f}%" if alert.change_pct else "", + ] + + # Add details + if alert.details: + lines.append(" Details:") + for key, value in alert.details.items(): + if isinstance(value, (int, float)): + if "pct" in key.lower() or "change" in key.lower(): + lines.append(f" {key}: {value:+.2f}%") + else: + lines.append(f" {key}: {value:,.2f}") + else: + lines.append(f" {key}: {value}") + + return "\n".join(line for line in lines if line) + + def format_alert_html(self, alert: MarketAlert) -> str: + """ + Format alert as HTML. + + Args: + alert: MarketAlert object + + Returns: + HTML formatted alert + """ + severity_class = "high" if (alert.severity or 0) >= 7 else "medium" if (alert.severity or 0) >= 4 else "low" + + html = f""" +
+

{alert.ticker} - {alert.alert_type.replace('_', ' ').title()}

+

{alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}

+

Severity: {alert.severity}/10

+
+ Price: ${float(alert.price):.2f} + Volume: {alert.volume:,} + Change: {float(alert.change_pct):+.2f}% +
+
+ """ + return html + + def filter_alerts( + self, + alerts: list[MarketAlert], + min_severity: int = 5, + tickers: list[str] | None = None, + alert_types: list[str] | None = None, + ) -> list[MarketAlert]: + """ + Filter alerts by criteria. + + Args: + alerts: List of alerts + min_severity: Minimum severity threshold + tickers: Only include these tickers (None = all) + alert_types: Only include these types (None = all) + + Returns: + Filtered list of alerts + """ + filtered = alerts + + # Filter by severity + filtered = [a for a in filtered if (a.severity or 0) >= min_severity] + + # Filter by ticker + if tickers: + ticker_set = set(t.upper() for t in tickers) + filtered = [a for a in filtered if a.ticker.upper() in ticker_set] + + # Filter by alert type + if alert_types: + type_set = set(alert_types) + filtered = [a for a in filtered if a.alert_type in type_set] + + return filtered + + def generate_summary_report( + self, alerts: list[MarketAlert], format: str = "text" + ) -> str: + """ + Generate summary report of alerts. + + Args: + alerts: List of alerts + format: Output format ('text' or 'html') + + Returns: + Formatted summary report + """ + if format == "html": + return self._generate_html_summary(alerts) + else: + return self._generate_text_summary(alerts) + + def _generate_text_summary(self, alerts: list[MarketAlert]) -> str: + """Generate text summary report.""" + if not alerts: + return "šŸ“­ No alerts to report." + + lines = [ + "=" * 80, + f" MARKET ACTIVITY ALERTS - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC", + f" {len(alerts)} Alerts", + "=" * 80, + "", + ] + + # Group by ticker + by_ticker: dict[str, list[MarketAlert]] = {} + for alert in alerts: + if alert.ticker not in by_ticker: + by_ticker[alert.ticker] = [] + by_ticker[alert.ticker].append(alert) + + # Sort tickers by max severity + sorted_tickers = sorted( + by_ticker.keys(), + key=lambda t: max((a.severity or 0) for a in by_ticker[t]), + reverse=True, + ) + + for ticker in sorted_tickers: + ticker_alerts = by_ticker[ticker] + max_sev = max((a.severity or 0) for a in ticker_alerts) + + lines.append("─" * 80) + lines.append(f"šŸŽÆ {ticker} - {len(ticker_alerts)} alerts (Max Severity: {max_sev}/10)") + lines.append("─" * 80) + + for alert in sorted( + ticker_alerts, key=lambda a: a.severity or 0, reverse=True + ): + lines.append("") + lines.append(self.format_alert_text(alert)) + + lines.append("") + + # Summary statistics + lines.append("=" * 80) + lines.append("šŸ“Š SUMMARY") + lines.append("=" * 80) + lines.append("") + lines.append(f"Total Alerts: {len(alerts)}") + lines.append(f"Unique Tickers: {len(by_ticker)}") + + # Alert type breakdown + type_counts: dict[str, int] = {} + for alert in alerts: + type_counts[alert.alert_type] = type_counts.get(alert.alert_type, 0) + 1 + + lines.append("\nAlert Types:") + for alert_type, count in sorted( + type_counts.items(), key=lambda x: x[1], reverse=True + ): + lines.append(f" {alert_type.replace('_', ' ').title():20s}: {count}") + + # Top severity alerts + lines.append("\nTop 5 Highest Severity:") + top_alerts = sorted(alerts, key=lambda a: a.severity or 0, reverse=True)[:5] + for alert in top_alerts: + lines.append( + f" {alert.ticker:6s} - {alert.alert_type:20s} (Severity: {alert.severity}/10)" + ) + + lines.append("") + lines.append("=" * 80) + + return "\n".join(lines) + + def _generate_html_summary(self, alerts: list[MarketAlert]) -> str: + """Generate HTML summary report.""" + html_parts = [ + "", + f"

Market Activity Alerts

", + f"

{len(alerts)} Alerts | {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC

", + ] + + for alert in sorted(alerts, key=lambda a: a.severity or 0, reverse=True): + html_parts.append(self.format_alert_html(alert)) + + html_parts.append("") + return "\n".join(html_parts) + diff --git a/src/pote/monitoring/market_monitor.py b/src/pote/monitoring/market_monitor.py new file mode 100644 index 0000000..bcbb4c5 --- /dev/null +++ b/src/pote/monitoring/market_monitor.py @@ -0,0 +1,281 @@ +""" +Real-time market monitoring for congressional tickers. +Detects unusual activity: volume spikes, price movements, volatility. +""" + +import logging +from datetime import datetime, timedelta, timezone +from decimal import Decimal +from typing import Any + +import yfinance as yf +from sqlalchemy.orm import Session + +from pote.db.models import MarketAlert, Security, Trade + +logger = logging.getLogger(__name__) + + +class MarketMonitor: + """Monitor stocks for unusual market activity.""" + + def __init__(self, session: Session): + """Initialize market monitor.""" + self.session = session + + def get_congressional_watchlist(self, limit: int = 50) -> list[str]: + """ + Get list of most-traded tickers by Congress. + + Args: + limit: Maximum number of tickers to return + + Returns: + List of ticker symbols + """ + from sqlalchemy import func + + result = ( + self.session.query(Security.ticker, func.count(Trade.id).label("count")) + .join(Trade) + .group_by(Security.ticker) + .order_by(func.count(Trade.id).desc()) + .limit(limit) + .all() + ) + + tickers = [r[0] for r in result] + logger.info(f"Built watchlist of {len(tickers)} tickers from congressional trades") + return tickers + + def check_ticker(self, ticker: str, lookback_days: int = 5) -> list[dict[str, Any]]: + """ + Check a single ticker for unusual activity. + + Args: + ticker: Stock ticker symbol + lookback_days: Days of history to analyze + + Returns: + List of alerts detected + """ + alerts = [] + + try: + stock = yf.Ticker(ticker) + + # Get recent history + hist = stock.history(period=f"{lookback_days}d", interval="1d") + + if len(hist) < 2: + logger.warning(f"Insufficient data for {ticker}") + return alerts + + # Calculate baseline metrics + avg_volume = hist["Volume"].mean() + avg_price_change = hist["Close"].pct_change().abs().mean() + + # Get latest data + latest = hist.iloc[-1] + prev = hist.iloc[-2] + + current_volume = latest["Volume"] + current_price = latest["Close"] + price_change = (current_price - prev["Close"]) / prev["Close"] + + # Check for unusual volume (3x average) + if current_volume > avg_volume * 3 and avg_volume > 0: + severity = min(10, int((current_volume / avg_volume) - 2)) + alerts.append( + { + "ticker": ticker, + "alert_type": "unusual_volume", + "timestamp": datetime.now(timezone.utc), + "details": { + "current_volume": int(current_volume), + "avg_volume": int(avg_volume), + "multiplier": round(current_volume / avg_volume, 2), + }, + "price": Decimal(str(current_price)), + "volume": int(current_volume), + "change_pct": Decimal(str(price_change * 100)), + "severity": severity, + } + ) + + # Check for significant price movement (>5%) + if abs(price_change) > 0.05: + severity = min(10, int(abs(price_change) * 100 / 2)) + alerts.append( + { + "ticker": ticker, + "alert_type": "price_spike" + if price_change > 0 + else "price_drop", + "timestamp": datetime.now(timezone.utc), + "details": { + "current_price": float(current_price), + "prev_price": float(prev["Close"]), + "change_pct": round(price_change * 100, 2), + }, + "price": Decimal(str(current_price)), + "volume": int(current_volume), + "change_pct": Decimal(str(price_change * 100)), + "severity": severity, + } + ) + + # Check for unusual volatility (price swings) + if len(hist) >= 5: + recent_volatility = hist["Close"].iloc[-5:].pct_change().abs().mean() + if recent_volatility > avg_price_change * 2 and avg_price_change > 0: + severity = min( + 10, int((recent_volatility / avg_price_change) - 1) + ) + alerts.append( + { + "ticker": ticker, + "alert_type": "high_volatility", + "timestamp": datetime.now(timezone.utc), + "details": { + "recent_volatility": round(recent_volatility * 100, 2), + "avg_volatility": round(avg_price_change * 100, 2), + "multiplier": round(recent_volatility / avg_price_change, 2), + }, + "price": Decimal(str(current_price)), + "volume": int(current_volume), + "change_pct": Decimal(str(price_change * 100)), + "severity": severity, + } + ) + + except Exception as e: + logger.error(f"Error checking {ticker}: {e}") + + return alerts + + def scan_watchlist( + self, tickers: list[str] | None = None, lookback_days: int = 5 + ) -> list[dict[str, Any]]: + """ + Scan multiple tickers for unusual activity. + + Args: + tickers: List of tickers to scan (None = use congressional watchlist) + lookback_days: Days of history to analyze + + Returns: + List of all alerts detected + """ + if tickers is None: + tickers = self.get_congressional_watchlist() + + all_alerts = [] + + logger.info(f"Scanning {len(tickers)} tickers for unusual activity...") + + for ticker in tickers: + alerts = self.check_ticker(ticker, lookback_days=lookback_days) + all_alerts.extend(alerts) + + if alerts: + logger.info( + f"šŸ”” {ticker}: {len(alerts)} alerts - " + + ", ".join(a["alert_type"] for a in alerts) + ) + + logger.info(f"Scan complete. Found {len(all_alerts)} total alerts.") + return all_alerts + + def save_alerts(self, alerts: list[dict[str, Any]]) -> int: + """ + Save alerts to database. + + Args: + alerts: List of alert dictionaries + + Returns: + Number of alerts saved + """ + saved = 0 + + for alert_data in alerts: + alert = MarketAlert(**alert_data) + self.session.add(alert) + saved += 1 + + self.session.commit() + logger.info(f"Saved {saved} alerts to database") + return saved + + def get_recent_alerts( + self, + ticker: str | None = None, + days: int = 7, + alert_type: str | None = None, + min_severity: int = 0, + ) -> list[MarketAlert]: + """ + Query recent alerts from database. + + Args: + ticker: Filter by ticker (None = all) + days: Look back this many days + alert_type: Filter by alert type (None = all) + min_severity: Minimum severity level + + Returns: + List of MarketAlert objects + """ + since = datetime.now(timezone.utc) - timedelta(days=days) + + query = self.session.query(MarketAlert).filter(MarketAlert.timestamp >= since) + + if ticker: + query = query.filter(MarketAlert.ticker == ticker) + + if alert_type: + query = query.filter(MarketAlert.alert_type == alert_type) + + if min_severity > 0: + query = query.filter(MarketAlert.severity >= min_severity) + + return query.order_by(MarketAlert.timestamp.desc()).all() + + def get_ticker_alert_summary(self, days: int = 30) -> dict[str, dict]: + """ + Get summary of alerts by ticker. + + Args: + days: Look back this many days + + Returns: + Dict mapping ticker to alert summary + """ + since = datetime.now(timezone.utc) - timedelta(days=days) + + from sqlalchemy import func + + results = ( + self.session.query( + MarketAlert.ticker, + func.count(MarketAlert.id).label("alert_count"), + func.avg(MarketAlert.severity).label("avg_severity"), + func.max(MarketAlert.severity).label("max_severity"), + ) + .filter(MarketAlert.timestamp >= since) + .group_by(MarketAlert.ticker) + .order_by(func.count(MarketAlert.id).desc()) + .all() + ) + + summary = {} + for r in results: + summary[r[0]] = { + "alert_count": r[1], + "avg_severity": round(float(r[2]), 2) if r[2] else 0, + "max_severity": r[3], + } + + return summary +