Capitol Trades Data Pipeline
An automated data engineering pipeline that scrapes, processes, and serves congressional stock trading disclosures through a normalized database and RESTful API.
📖 Overview
This project is a fully automated web scraping system that collects trade disclosure data from Capitol Trades, enriches it with real-time market data from financial APIs, and stores it in a normalized PostgreSQL database. The data is exposed through a secure REST API with role-based access control, and visualized through an interactive web dashboard.
The system runs as a distributed scraping network across multiple Raspberry Pi devices, with scheduled jobs ensuring data freshness and comprehensive logging for monitoring and debugging.
✨ Key Features
- 🔄 Automated Web Scraping — Playwright-based browser automation with retry logic and error handling
- 📡 API Integration — Real-time market data from FinnHub and CoinMarketCap APIs
- 🗃️ Normalized Database — Star schema design with dimension tables and fact tables
- 🔐 Secure REST API — Role-based access with API key authentication
- 📊 Interactive Dashboard — Searchable, filterable, paginated data views
- 📅 Scheduled Execution — Cron-based automation with comprehensive logging
🛠️ Tech Stack
| Category | Technology |
|---|---|
| Language | Python 3.8+ |
| Web Scraping | Playwright (Chromium headless browser) |
| Database | PostgreSQL 14+ with psycopg2 |
| API Framework | FastAPI with Pydantic validation |
| External APIs | FinnHub (stocks), CoinMarketCap (crypto) |
| Scheduling | Cron jobs / systemd timers |
| Logging | Python logging module with file handlers |
| Infrastructure | Raspberry Pi cluster (distributed scraping) |
🏗️ System Architecture
┌─────────────────────────────────────────────────────────────────────────┐ │ DATA COLLECTION LAYER │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────┐ │ │ │ Playwright │ │ FinnHub │ │ CoinMarketCap │ │ │ │ Scraper │ │ Stock API │ │ Crypto API │ │ │ │ (Capitol Trades)│ │ (Prices) │ │ (Prices/Tickers) │ │ │ └────────┬────────┘ └────────┬────────┘ └────────────┬────────────┘ │ │ │ │ │ │ │ └────────────────────┼────────────────────────┘ │ │ ▼ │ ├─────────────────────────────────────────────────────────────────────────┤ │ DATA PROCESSING LAYER │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ Python ETL Pipeline │ │ │ │ • Date parsing & normalization • Ticker validation │ │ │ │ • Fuzzy matching (difflib) • Duplicate detection │ │ │ │ • Options parsing (calls/puts) • Error handling & retries │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ ▼ │ ├─────────────────────────────────────────────────────────────────────────┤ │ DATA STORAGE LAYER │ │ ┌──────────────────────────────────────────────────────────────────┐ │ │ │ PostgreSQL Database │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ politicians│ │ parties │ │ chambers │ │ tickers │ │ │ │ │ │ (dimension)│ │ (dimension)│ │ (dimension)│ │ (dimension)│ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ └────────────┘ │ │ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ │ │ normalized_trades (fact table) │ │ │ │ │ │ politician_id │ party_id │ ticker_id │ traded_date │ ... │ │ │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ │ │ prices (time-series) │ │ │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────┘ │ │ ▼ │ ├─────────────────────────────────────────────────────────────────────────┤ │ API & PRESENTATION │ │ ┌─────────────────────────┐ ┌─────────────────────────────────┐ │ │ │ FastAPI REST │ │ Flask Dashboard │ │ │ │ • GET/POST endpoints │ │ • Interactive tables │ │ │ │ • API key auth │ │ • Charts & visualizations │ │ │ │ • Role-based access │ │ • Pagination & filtering │ │ │ └─────────────────────────┘ └─────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────┘
🗄️ Database Schema
The database follows a normalized star schema design, separating dimension tables (politicians, parties, chambers, tickers) from the central fact table (normalized_trades). This architecture enables efficient queries, reduces data redundancy, and supports complex analytics.
Dimension Tables
| Table | Purpose |
|---|---|
politicians |
Unique list of politicians involved in trades |
parties |
Political party affiliations (Democrat, Republican, etc.) |
chambers |
Legislative chambers (House, Senate) |
tickers |
Unified asset registry (stocks, crypto, other) |
owners |
Trade attribution (self, spouse, child) |
trade_types |
Transaction types (Purchase, Sale, Exchange) |
trade_sizes |
Value ranges ($1K-$15K, $15K-$50K, etc.) |
Fact Tables
| Table | Purpose |
|---|---|
normalized_trades |
Central trade records with foreign keys to all dimensions |
option_trade_details |
Extended metadata for options trades (strike, expiry, call/put) |
prices |
Time-series price data for all tracked tickers |
🕷️ Scraper Pipeline
The scraper uses Playwright to automate a headless Chromium browser, navigating through paginated trade listings and extracting structured data. The pipeline includes intelligent error handling, duplicate detection, and automatic ticker validation.
🔄 Data Collection
- Headless browser automation
- Pagination handling (100 per page)
- Configurable retry logic (3 attempts)
- Rate limiting to avoid blocks
🧹 Data Processing
- Date parsing with fuzzy matching
- Ticker validation via FinnHub
- Options contract parsing
- Duplicate trade detection
📝 Logging & Monitoring
- Separate log files per component
- Debug, info, and error levels
- Insert tracking and auditing
- Runtime performance metrics
# Scraper configuration
MAX_RETRIES = 3
BASE_URL = "https://www.capitoltrades.com/trades?pageSize=100&page="
STOP_ON_DUPLICATE = False # Continue or halt on duplicate detection
# Logging setup
capitol_scraper_logger = logging.getLogger("capitol_scraper_normalized")
db_inserts_logger = logging.getLogger("db_inserts")
debug_logger = logging.getLogger("normalized_insert")
🔐 REST API
The data is exposed through a secure FastAPI backend with role-based access control. API keys grant different permission levels, enabling both read-only dashboards and read-write integrations for automated systems.
Authentication
# All requests require an API key header
headers = {
"Content-Type": "application/json",
"api-key": "YOUR_API_KEY"
}
# Roles: admin, read-write, read-only
Available Endpoints
| Endpoint | Method | Description |
|---|---|---|
/get_capitol_trades |
GET | Retrieve trades with filters, pagination, sorting |
/get_stock_tickers |
GET | List all tracked stock symbols |
/get_crypto_tickers |
GET | List all tracked cryptocurrency symbols |
/get_prices |
GET | Historical price data for any ticker |
/insert_trade |
POST | Add new trade records (read-write only) |
/batch_insert |
POST | Bulk insert multiple records (read-write only) |
Query Examples
# Get trades by a specific politician
GET /get_capitol_trades?fields=politician,trade_ticker&filter=politician=Nancy Pelosi
# Get trades within a date range
GET /get_capitol_trades?fields=trade_ticker,traded_date&date_from=2024-01-01&date_to=2024-12-31
# Get trades by type with pagination
GET /get_capitol_trades?filter=type=Purchase&page=1&limit=50
📊 Technical Specifications
| Metric | Value |
|---|---|
| Python Version | 3.8+ |
| PostgreSQL Version | 14+ |
| Scrape Page Size | 100 trades per page |
| Max Retry Attempts | 3 per page |
| API Key Roles | admin, read-write, read-only |
| Ticker Categories | stock, crypto, other |
| Scheduling | Cron (configurable intervals) |
| Log Rotation | Per-component log files |
✅ Implemented Features
🚀 Future Enhancements
- ⬜ Real-time WebSocket price feeds
- ⬜ Machine learning trade pattern detection
- ⬜ Politician portfolio tracking over time
- ⬜ Automated alert system for significant trades
- ⬜ Integration with additional data sources (SEC filings)
- ⬜ GraphQL API endpoint
- ⬜ Data export (CSV, JSON, Excel)