POTE/src/pote/db/models.py
ilia cfaf38b0be Phase 1: Real-Time Market Monitoring System
COMPLETE: Real-time unusual activity detection for congressional tickers

New Database Model:
- MarketAlert: Stores unusual market activity alerts
  * Tracks volume spikes, price movements, volatility
  * JSON details field for flexible data storage
  * Severity scoring (1-10 scale)
  * Indexed for efficient queries by ticker/timestamp

New Modules:
- src/pote/monitoring/market_monitor.py: Core monitoring engine
  * get_congressional_watchlist(): Top 50 most-traded tickers
  * check_ticker(): Analyze single stock for unusual activity
  * scan_watchlist(): Batch analysis of multiple tickers
  * Detection logic:
    - Unusual volume (3x average)
    - Price spikes/drops (>5%)
    - High volatility (2x normal)
  * save_alerts(): Persist to database
  * get_recent_alerts(): Query historical alerts

- src/pote/monitoring/alert_manager.py: Alert formatting & filtering
  * format_alert_text(): Human-readable output
  * format_alert_html(): HTML email format
  * filter_alerts(): By severity, ticker, type
  * generate_summary_report(): Text/HTML reports

Scripts:
- scripts/monitor_market.py: CLI monitoring tool
  * Continuous monitoring mode (--interval)
  * One-time scan (--once)
  * Custom ticker lists or auto-detect congressional watchlist
  * Severity filtering (--min-severity)
  * Report generation and saving

Migrations:
- alembic/versions/f44014715b40_add_market_alerts_table.py

Documentation:
- docs/11_live_market_monitoring.md: Complete explanation
  * Why you can't track WHO is trading
  * What IS possible (timing analysis)
  * How hybrid monitoring works
  * Data sources and APIs

Usage:
  # Monitor congressional tickers (one-time scan)
  python scripts/monitor_market.py --once

  # Continuous monitoring (every 5 minutes)
  python scripts/monitor_market.py --interval 300

  # Monitor specific tickers
  python scripts/monitor_market.py --tickers NVDA,MSFT,AAPL --once

Next Steps (Phase 2):
- Disclosure correlation engine
- Timing advantage calculator
- Suspicious trade flagging
2025-12-15 15:10:49 -05:00

269 lines
9.9 KiB
Python

"""
SQLAlchemy ORM models for POTE.
Matches the schema defined in docs/02_data_model.md.
"""
from datetime import date, datetime, timezone
from decimal import Decimal
from sqlalchemy import (
DECIMAL,
Date,
DateTime,
ForeignKey,
Index,
Integer,
JSON,
String,
Text,
UniqueConstraint,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from pote.db import Base
class Official(Base):
"""Government officials (Congress members, etc.)."""
__tablename__ = "officials"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
chamber: Mapped[str | None] = mapped_column(String(50)) # "House", "Senate", etc.
party: Mapped[str | None] = mapped_column(String(50))
state: Mapped[str | None] = mapped_column(String(2))
bioguide_id: Mapped[str | None] = mapped_column(String(20), unique=True)
external_ids: Mapped[str | None] = mapped_column(Text) # JSON blob for other IDs
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
# Relationships
trades: Mapped[list["Trade"]] = relationship("Trade", back_populates="official")
def __repr__(self) -> str:
return f"<Official(id={self.id}, name='{self.name}', chamber='{self.chamber}')>"
class Security(Base):
"""Securities (stocks, bonds, etc.)."""
__tablename__ = "securities"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ticker: Mapped[str] = mapped_column(String(20), nullable=False, unique=True, index=True)
name: Mapped[str | None] = mapped_column(String(200))
exchange: Mapped[str | None] = mapped_column(String(50))
sector: Mapped[str | None] = mapped_column(String(100))
industry: Mapped[str | None] = mapped_column(String(100))
asset_type: Mapped[str] = mapped_column(String(50), default="stock") # stock, bond, etc.
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
# Relationships
trades: Mapped[list["Trade"]] = relationship("Trade", back_populates="security")
prices: Mapped[list["Price"]] = relationship("Price", back_populates="security")
def __repr__(self) -> str:
return f"<Security(id={self.id}, ticker='{self.ticker}', name='{self.name}')>"
class Trade(Base):
"""Trades disclosed by officials."""
__tablename__ = "trades"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
official_id: Mapped[int] = mapped_column(ForeignKey("officials.id"), nullable=False, index=True)
security_id: Mapped[int] = mapped_column(
ForeignKey("securities.id"), nullable=False, index=True
)
# Core trade fields
source: Mapped[str] = mapped_column(String(50), nullable=False) # "quiver", "fmp", etc.
external_id: Mapped[str | None] = mapped_column(String(100)) # source-specific ID
transaction_date: Mapped[date] = mapped_column(Date, nullable=False, index=True)
filing_date: Mapped[date | None] = mapped_column(Date, index=True)
side: Mapped[str] = mapped_column(String(20), nullable=False) # "buy", "sell", "exchange"
# Amount (often disclosed as a range)
value_min: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 2))
value_max: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 2))
amount: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 2)) # shares/units if available
currency: Mapped[str] = mapped_column(String(3), default="USD")
# Quality flags (JSON or enum list)
quality_flags: Mapped[str | None] = mapped_column(Text) # e.g., "range_only,delayed_filing"
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
# Relationships
official: Mapped["Official"] = relationship("Official", back_populates="trades")
security: Mapped["Security"] = relationship("Security", back_populates="trades")
# Constraints
__table_args__ = (
Index("ix_trades_official_date", "official_id", "transaction_date"),
Index("ix_trades_security_date", "security_id", "transaction_date"),
UniqueConstraint(
"source", "external_id", name="uq_trades_source_external_id"
), # dedup by source ID
)
def __repr__(self) -> str:
return (
f"<Trade(id={self.id}, official_id={self.official_id}, "
f"ticker={self.security.ticker if self.security else 'N/A'}, "
f"side='{self.side}', date={self.transaction_date})>"
)
class Price(Base):
"""Daily price data for securities."""
__tablename__ = "prices"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
security_id: Mapped[int] = mapped_column(
ForeignKey("securities.id"), nullable=False, index=True
)
date: Mapped[date] = mapped_column(Date, nullable=False, index=True)
open: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 4))
high: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 4))
low: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 4))
close: Mapped[Decimal] = mapped_column(DECIMAL(15, 4), nullable=False)
volume: Mapped[int | None] = mapped_column(Integer)
adjusted_close: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 4))
source: Mapped[str] = mapped_column(String(50), default="yfinance")
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
# Relationships
security: Mapped["Security"] = relationship("Security", back_populates="prices")
# Constraints
__table_args__ = (UniqueConstraint("security_id", "date", name="uq_prices_security_date"),)
def __repr__(self) -> str:
return f"<Price(security_id={self.security_id}, date={self.date}, close={self.close})>"
# Future analytics models (stubs for now, will implement in Phase 2)
class MetricOfficial(Base):
"""Aggregate metrics per official (Phase 2)."""
__tablename__ = "metrics_official"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
official_id: Mapped[int] = mapped_column(ForeignKey("officials.id"), nullable=False, index=True)
calc_date: Mapped[date] = mapped_column(Date, nullable=False)
calc_version: Mapped[str] = mapped_column(String(20), nullable=False)
# Placeholder metric fields (will expand in Phase 2)
trade_count: Mapped[int | None] = mapped_column(Integer)
avg_abnormal_return_1m: Mapped[Decimal | None] = mapped_column(DECIMAL(10, 6))
cluster_label: Mapped[str | None] = mapped_column(String(50))
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
__table_args__ = (
UniqueConstraint("official_id", "calc_date", "calc_version", name="uq_metrics_official"),
)
class MetricTrade(Base):
"""Per-trade metrics (abnormal returns, etc., Phase 2)."""
__tablename__ = "metrics_trade"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
trade_id: Mapped[int] = mapped_column(ForeignKey("trades.id"), nullable=False, index=True)
calc_date: Mapped[date] = mapped_column(Date, nullable=False)
calc_version: Mapped[str] = mapped_column(String(20), nullable=False)
# Placeholder metric fields
return_1m: Mapped[Decimal | None] = mapped_column(DECIMAL(10, 6))
abnormal_return_1m: Mapped[Decimal | None] = mapped_column(DECIMAL(10, 6))
signal_flags: Mapped[str | None] = mapped_column(Text) # JSON list
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
__table_args__ = (
UniqueConstraint("trade_id", "calc_date", "calc_version", name="uq_metrics_trade"),
)
class MarketAlert(Base):
"""
Real-time market activity alerts.
Tracks unusual volume, price movements, and other anomalies.
"""
__tablename__ = "market_alerts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
ticker: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
alert_type: Mapped[str] = mapped_column(
String(50), nullable=False
) # 'unusual_volume', 'price_spike', 'options_flow', etc.
timestamp: Mapped[datetime] = mapped_column(DateTime, nullable=False, index=True)
# Alert details (stored as JSON)
details: Mapped[dict | None] = mapped_column(JSON)
# Metrics at time of alert
price: Mapped[Decimal | None] = mapped_column(DECIMAL(15, 4))
volume: Mapped[int | None] = mapped_column(Integer)
change_pct: Mapped[Decimal | None] = mapped_column(
DECIMAL(10, 4)
) # Price change %
# Severity scoring
severity: Mapped[int | None] = mapped_column(Integer) # 1-10 scale
# Metadata
source: Mapped[str] = mapped_column(String(50), default="market_monitor")
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(timezone.utc)
)
# Indexes for efficient queries
__table_args__ = (
Index("ix_market_alerts_ticker_timestamp", "ticker", "timestamp"),
Index("ix_market_alerts_alert_type", "alert_type"),
)
def __repr__(self) -> str:
return (
f"<MarketAlert(ticker='{self.ticker}', type='{self.alert_type}', "
f"timestamp={self.timestamp}, severity={self.severity})>"
)