Market Ingestion
Polymarket and Kalshi data connectors and sync pipelines
The ingestion pipeline continuously fetches market data from two prediction market exchanges, normalizes it into a unified schema, and stores it in PostgreSQL. The pipeline runs on BullMQ repeatable jobs managed by the MarketSyncScheduler.
Exchange Connectors
Polymarket
Polymarket data comes through two APIs:
- Gamma API -- Market listings, metadata, categories, and event groupings
- CLOB API -- Real-time prices, order book depth, and trade history. Also provides historical price candles (
fidelity=60for hourly,fidelity=1440for daily)
The connector fetches the top 30 events by volume and all markets within those events.
Kalshi
Kalshi data comes through the Trade API v2:
- Market listings, prices, and event metadata
- Daily price candles (
period_interval=1440) for historical data - Order book snapshots
The connector also fetches the top 30 events by volume, but total event count is hard-capped.
Kalshi event fetching must be capped at 200 events. Uncapped fetching overloads the shared 8 GB VM and can crash the Node process. This cap is enforced in the Kalshi connector.
Sync Schedules
The market-sync BullMQ queue runs three types of repeatable jobs:
| Job | Frequency | Description |
|---|---|---|
full-sync | Every 5 minutes | Fetches complete market listings, metadata, volumes, and runs the updatePriceChanges() query (4 LATERAL JOINs for 5m/1h/6h/24h deltas) |
price-sync | Every 60 seconds | Fetches current prices for all ~4,500 active markets. Only writes MarketPriceSnapshot rows when price delta exceeds 0.0001 |
orderbook-sync | Every 60 seconds | Fetches order book depth for active markets |
Price-Sync Memory Optimizations
The price-sync job processes ~4,500 markets every 60 seconds. Two critical optimizations prevent the Node process from growing past 5 GB:
- Delta-only snapshots --
MarketPriceSnapshotrows are only inserted when|price_change| > 0.0001, avoiding thousands of redundant writes per cycle - Deferred price-change queries -- The expensive
updatePriceChanges()query (computing 5m/1h/6h/24h changes across all markets) runs only during full-sync (every 5 minutes), not on every price-sync cycle
Normalization
The NormalizerService converts exchange-specific data into the unified Market model:
Polymarket Raw Data ──┐
├──▶ NormalizerService ──▶ Prisma Market Record
Kalshi Raw Data ──────┘Key normalizations:
- Slug generation -- Consistent URL-safe slugs from market titles
- Category mapping -- Exchange-specific categories mapped to Vezta categories
- Price format -- Both exchanges normalize to
yesPriceandnoPriceasDecimal(10, 6) - Volume -- Standardized to USD with
Decimal(18, 2)precision - Timestamps --
startedAtuses the exchange's creation time (PolymarketcreatedAt, Kalshiopen_time);createdAtis when Vezta ingested the market
Price History Backfill
A separate price-history-backfill job (every 60 seconds) populates historical price data for charts:
| Exchange | Strategy | Rate Limiting |
|---|---|---|
| Polymarket | Hourly candles (30 days) merged with daily candles (full history) | 5 parallel requests, 500ms delay |
| Kalshi | Daily candles only (hourly returns 400 for old markets) | Sequential, 1 market at a time, 3s delay |
The backfill only inserts data points older than the earliest existing snapshot to avoid duplicates. Markets are marked priceHistoryBackfilled = true once backfill completes.
Kalshi API returns price data in price.close_dollars (not price.close). The connector handles both field names for compatibility.
Chart Data
The getChartData endpoint returns OHLC data aggregated via SQL date_bin():
| Timeframe | Bucket Size | Data Source |
|---|---|---|
| 6H | 1 minute | Real-time price-sync snapshots |
| 1D | 5 minutes | Real-time price-sync snapshots |
| 1W | 30 minutes | Backfilled + real-time snapshots |
| 1M | 1 hour | Backfilled + real-time snapshots |
| ALL | 1 day | Backfilled snapshots (queries from epoch) |