Caching & Queues
Redis cache strategy and BullMQ job scheduling
Vezta uses Redis for two purposes: application-level caching of frequently accessed data, and BullMQ job queues for all background processing. Both are backed by the same Redis instance running as a Docker container (vezta-redis).
Redis Cache
The RedisCacheService (in src/common/cache/) provides a global caching layer available to all modules:
| Cache Key Pattern | TTL | Data |
|---|---|---|
| Market data | Short (seconds) | Current prices, volumes, market metadata |
| Leaderboard | Minutes | Top trader rankings by period and category |
| Price snapshots | Minutes | Recent price history for sparklines |
| User sessions | Token lifetime | JWT validation data |
Cache invalidation happens when the underlying data changes -- for example, when a price-sync job writes new prices, the corresponding cache entries are refreshed.
BullMQ Queues
BullMQ handles all background processing through named queues. Each queue is registered per-module via BullModule.registerQueue() in the module's imports array -- not at the application level.
Active Queues
| Queue | Module | Purpose |
|---|---|---|
market-sync | Market | Full-sync, price-sync, orderbook-sync on repeatable schedules |
price-history-backfill | Market | Historical candle data backfill for charts |
signal-ingestion | Monitor | News connector jobs (RSS, GDELT, NewsData, CryptoNews, Telegram, X) |
signal-detector | Monitor | Market-data-based signal detection (whale moves, smart money) |
top-trader | Leaderboard | Leaderboard sync (every 10 min) and trader enrichment (every 30 min) |
spread-scanner | Arbitrage | Cross-platform price discrepancy detection |
leaderboard-sync | Leaderboard | Aggregate leaderboard data refresh |
Job Scheduling Pattern
Modules that need repeatable jobs implement OnModuleInit and follow a consistent pattern:
async onModuleInit() {
// 1. Clean old repeatable jobs to avoid duplicates
const existing = await this.queue.getRepeatableJobs();
for (const job of existing) {
await this.queue.removeRepeatableByKey(job.key);
}
// 2. Register new repeatable jobs
await this.queue.add('full-sync', {}, {
repeat: { every: 5 * 60 * 1000 }, // every 5 minutes
});
await this.queue.add('price-sync', {}, {
repeat: { every: 60 * 1000 }, // every 60 seconds
});
// 3. Optionally trigger an immediate run
await this.queue.add('full-sync', {}, {
jobId: 'initial-sync',
});
}Schedulers
| Scheduler | Jobs Registered |
|---|---|
MarketSyncScheduler | full-sync (5min), price-sync (60s), orderbook-sync (60s) |
SignalIngestionScheduler | RSS (5min), GDELT (30min), NewsData (15min), CryptoNews (10min), Telegram (15min), X (60min), cleanup (daily) |
SpreadScannerScheduler | Arbitrage spread scanning |
NewsSyncScheduler | News article synchronization |
WalletMonitorScheduler | Tracked wallet activity polling |
RewardsScheduler | Points calculation and mission progress |
SniperMonitorScheduler | Sniper order price monitoring |
CounterTradeResetScheduler | Daily PnL and trade count reset at midnight |
Processors
Each queue has a corresponding processor class decorated with @Processor('queue-name') that handles job execution:
@Processor('market-sync')
export class MarketSyncProcessor {
@Process('full-sync')
async handleFullSync(job: Job) {
// Fetch markets, normalize, upsert...
}
@Process('price-sync')
async handlePriceSync(job: Job) {
// Fetch prices, write snapshots...
}
}Redis Configuration
BullModule.forRoot() in app.module.ts uses REDIS_HOST and REDIS_PORT environment variables (not REDIS_URL). These default to localhost:6379 if unset. In Docker, REDIS_HOST must be set to the container name (e.g., vezta-redis).
Connection Details
| Environment | REDIS_HOST | REDIS_PORT |
|---|---|---|
| Local development | localhost | 6379 |
| Docker (production) | vezta-redis | 6379 |
Both the cache and BullMQ queues share the same Redis instance. The Redis container has health checks configured (redis-cli ping, 10s interval, 3 retries), and the API container depends on Redis being healthy before starting.
Alternative: setInterval
Some modules use native setInterval for simpler polling instead of BullMQ:
copy-trademodule -- Polls for new trades from target walletscounter-trademodule -- Monitors target wallet activity
For new schedulers, prefer BullMQ over setInterval for better reliability, retry handling, and observability.