Files
lewismac e891cd7c15 feat: Integrate 100% indicator coverage enforcement into data collection
- Add _ensure_indicator_coverage() to verify and backfill after data collection
- Add start_indicator_coverage_monitor() background task for periodic checks
- Configure coverage monitoring with ensure_100_percent_coverage flag
- Set coverage_check_interval_hours (default: 6 hours) for monitoring frequency
- Set backfill_batch_size (default: 200) for efficient backfilling
- Call coverage check after bulk downloads, gap fills, and candle generation
- Start indicator_coverage_monitor task in continuous collection mode
- Log coverage percentages and backfill results for transparency
- Ensure all OHLCV records have complete technical indicator coverage

This integrates the new db.py indicator coverage methods into the main
data collection workflow, ensuring 100% coverage is automatically
maintained across all symbol/interval combinations.
2025-10-09 08:51:09 +01:00

1468 lines
59 KiB
Python

#!/usr/bin/env python3
"""
main.py - Complete Binance Trading Data Collection System
Main application entry point with async data collection, websocket handling, bulk
backfill orchestration, periodic gap scans, and task management.
"""
import asyncio
import logging
import signal
import sys
import json
import subprocess
import os
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional, Any, Tuple
from contextlib import asynccontextmanager
import websockets
import aiohttp # kept for future-proofing network ops
from binance.client import Client
from binance.exceptions import BinanceAPIException
import pandas as pd
import pandas_ta as ta
from dotenv import load_dotenv
# Import our modules
from db import DatabaseManager
from utils import (
load_config, setup_logging, parse_kline_data, parse_trade_data,
calculate_technical_indicators, validate_symbol, format_timestamp
)
# Load environment variables
load_dotenv('variables.env')
# Global variables
db_manager: Optional[DatabaseManager] = None
config: Dict[str, Any] = {}
running_tasks: Dict[str, asyncio.Task] = {}
websocket_connections: Dict[str, Any] = {}
ui_process: Optional[subprocess.Popen] = None
class BinanceDataCollector:
"""Main data collection orchestrator for Binance trading data"""
def __init__(self):
self.client: Optional[Client] = None
self.logger = logging.getLogger(__name__)
self.is_collecting = False
self.websocket_collection_running = False
self.download_progress: Dict[str, Any] = {}
# Concurrency controls from env with sensible defaults
max_downloads = int(os.getenv('MAX_CONCURRENT_DOWNLOADS', '3'))
max_gap_fills = int(os.getenv('MAX_CONCURRENT_GAP_FILLS', '2'))
self._download_semaphore = asyncio.Semaphore(max_downloads)
self._gap_fill_semaphore = asyncio.Semaphore(max_gap_fills)
self.logger.info(
f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills"
)
async def initialize(self):
"""Initialize the data collector"""
global db_manager, config
# Setup logging
setup_logging()
self.logger.info("Initializing Binance Data Collector")
# Load configuration
config = load_config()
# Optional: add defaults for new flags
coll = config.setdefault("collection", {})
coll.setdefault("default_record_from_date", "2020-01-01T00:00:00Z")
coll.setdefault("initial_full_backfill", True)
coll.setdefault("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"])
coll.setdefault("bulk_chunk_size", int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000")))
coll.setdefault("tick_batch_size", int(os.getenv("TICK_BATCH_SIZE", "100")))
coll.setdefault("max_retries", int(os.getenv("MAX_RETRIES", "3")))
coll.setdefault("retry_delay", 1)
gap = config.setdefault("gap_filling", {})
gap.setdefault("enable_auto_gap_filling", True)
gap.setdefault("auto_fill_schedule_hours", 24)
gap.setdefault("enable_intelligent_averaging", True)
gap.setdefault("max_fill_attempts", int(os.getenv("MAX_RETRIES", "3")))
gap.setdefault("intervals_to_monitor", coll.get("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"]))
gap.setdefault("max_gap_size_candles", 1000)
gap.setdefault("averaging_lookback_candles", 10)
gap.setdefault("max_consecutive_empty_candles", 5)
# Add indicator coverage configuration
ind = config.setdefault("technical_indicators", {})
ind.setdefault("ensure_100_percent_coverage", True)
ind.setdefault("coverage_check_interval_hours", 6)
ind.setdefault("backfill_batch_size", 200)
self.logger.info(f"Loaded configuration for {len(config['trading_pairs'])} trading pairs")
# Initialize database
db_manager = DatabaseManager()
await db_manager.initialize()
self.logger.info("Database initialized successfully")
# Initialize Binance client (no API key needed for market data)
api_key = os.getenv('BINANCE_API_KEY')
secret_key = os.getenv('BINANCE_SECRET_KEY')
if api_key and secret_key:
self.client = Client(api_key, secret_key)
self.logger.info("Binance client initialized with API credentials")
else:
self.client = Client()
self.logger.info("Binance client initialized without API credentials (public data only)")
# ---------------------------
# Bulk backfill orchestration
# ---------------------------
async def start_bulk_download_for_all_pairs(self):
"""
Automatically launch full-history downloads for all enabled pairs,
starting from record_from_date for each pair, across all configured intervals.
"""
global config
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
if not enabled_pairs:
self.logger.warning("No enabled trading pairs found for bulk backfill")
return
# Sort by priority ascending (1 higher priority), then by symbol for stable order
enabled_pairs.sort(key=lambda p: (int(p.get('priority', 1)), p.get('symbol', '')))
# Build tasks per symbol to respect MAX_CONCURRENT_DOWNLOADS at the symbol level
tasks: List[asyncio.Task] = []
now_utc = datetime.now(timezone.utc)
for pair in enabled_pairs:
symbol = pair['symbol'].upper()
start_iso = pair.get('record_from_date') or config["collection"]["default_record_from_date"]
try:
start_dt = datetime.fromisoformat(start_iso.replace("Z", "+00:00"))
except Exception:
self.logger.warning(f"Invalid record_from_date for {symbol}: {start_iso}, falling back to default")
start_dt = datetime.fromisoformat(config["collection"]["default_record_from_date"].replace("Z", "+00:00"))
# One task per symbol to execute all intervals concurrently for that symbol
tasks.append(asyncio.create_task(
self._bulk_download_symbol_all_intervals(symbol, start_dt, now_utc),
name=f"bulk_{symbol}"
))
# Execute with graceful progress logging
self.logger.info(f"Launching bulk backfill for {len(tasks)} symbols...")
results = await asyncio.gather(*tasks, return_exceptions=True)
errors = [r for r in results if isinstance(r, Exception)]
if errors:
self.logger.error(f"Bulk backfill completed with {len(errors)} errors; see logs for details")
else:
self.logger.info("Bulk backfill completed successfully for all symbols")
async def _bulk_download_symbol_all_intervals(
self,
symbol: str,
start_date: datetime,
end_date: Optional[datetime] = None
):
"""
Launch concurrent downloads of all configured intervals for one symbol,
bounded by the download semaphore to control exchange load.
"""
global config
async with self._download_semaphore:
end_date = end_date or datetime.now(timezone.utc)
# Ensure progress structure
intervals = config.get("collection", {}).get("candle_intervals",
["1m", "5m", "15m", "1h", "4h", "1d"])
self.download_progress[symbol] = {
"status": "running",
"intervals": {iv: {"status": "pending", "records": 0} for iv in intervals},
"start_time": datetime.now(timezone.utc).isoformat()
}
# Spawn all intervals concurrently for this symbol
self.logger.info(f"Starting concurrent bulk for {symbol} on {intervals}")
interval_tasks = [
asyncio.create_task(
self._bulk_download_one_interval(symbol, interval, start_date, end_date),
name=f"bulk_{symbol}_{interval}"
)
for interval in intervals
]
results = await asyncio.gather(*interval_tasks, return_exceptions=True)
# Mark final status
if any(isinstance(r, Exception) for r in results):
self.download_progress[symbol]["status"] = "error"
self.download_progress[symbol]["error"] = "One or more intervals failed"
else:
self.download_progress[symbol]["status"] = "completed"
self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat()
async def _bulk_download_one_interval(
self,
symbol: str,
interval: str,
start_date: datetime,
end_date: datetime
):
"""Run the bulk downloader for a single symbol+interval and then compute indicators."""
# Update status
sp = self.download_progress.setdefault(symbol, {"intervals": {}})
sp["intervals"].setdefault(interval, {"status": "pending", "records": 0})
sp["intervals"][interval]["status"] = "checking"
records_count = await self._collect_historical_klines(symbol, interval, start_date, end_date)
if records_count > 0:
sp["intervals"][interval]["status"] = "calculating_indicators"
sp["intervals"][interval]["records"] = records_count
# Calculate indicators
await self._calculate_and_store_indicators(symbol, interval)
# Ensure 100% indicator coverage
await self._ensure_indicator_coverage(symbol, interval)
sp["intervals"][interval]["status"] = "completed"
self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records")
else:
sp["intervals"][interval]["status"] = "skipped_complete"
self.logger.info(f"Skipped {interval} for {symbol} - data already complete or no new records")
# ---------------------------
# Intelligent bulk downloader
# ---------------------------
async def bulk_download_historical_data(
self, symbol: str, start_date: datetime, end_date: Optional[datetime] = None,
intervals: Optional[List[str]] = None
):
"""
Bulk download historical OHLCV data from Binance with intelligent gap detection.
Only downloads data that doesn't already exist in the database.
Note: kept for API/UI compatibility; orchestration now prefers start_bulk_download_for_all_pairs.
"""
async with self._download_semaphore:
if end_date is None:
end_date = datetime.now(timezone.utc)
# Ensure timezone awareness
if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc)
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)
self.logger.info(f"Starting intelligent bulk download for {symbol} from {start_date} to {end_date}")
# Get intervals
if intervals is None:
intervals = config.get("collection", {}).get("candle_intervals",
["1m", "5m", "15m", "1h", "4h", "1d"])
# Initialize progress tracking
self.download_progress[symbol] = {
"status": "running",
"intervals": {},
"start_time": datetime.now(timezone.utc).isoformat()
}
for interval in intervals:
self.download_progress[symbol]["intervals"][interval] = {"status": "pending", "records": 0}
try:
# Run intervals concurrently to improve throughput for one symbol
tasks = [
asyncio.create_task(self._bulk_download_one_interval(symbol, interval, start_date, end_date),
name=f"bulk_single_{symbol}_{interval}")
for interval in intervals
]
await asyncio.gather(*tasks)
self.download_progress[symbol]["status"] = "completed"
self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat()
except Exception as e:
self.logger.error(f"Error in bulk download for {symbol}: {e}", exc_info=True)
self.download_progress[symbol]["status"] = "error"
self.download_progress[symbol]["error"] = str(e)
raise
async def _collect_historical_klines(
self, symbol: str, interval: str, start_date: datetime, end_date: datetime
) -> int:
"""
Intelligently collect historical kline data, only downloading missing ranges.
Returns:
Number of NEW records collected (not including already existing data)
"""
global db_manager
# Check if data already exists for this range
coverage_check = await db_manager.check_data_exists_for_range(
symbol, interval, start_date, end_date
)
self.logger.info(
f"Data coverage for {symbol} {interval}: "
f"{coverage_check['coverage_percent']:.2f}% "
f"({coverage_check['count']}/{coverage_check['expected_count']} records)"
)
# If coverage is complete, skip download
if coverage_check.get('is_complete'):
self.logger.info(
f"Skipping {symbol} {interval} - data already complete "
f"({coverage_check['coverage_percent']:.2f}% coverage)"
)
return 0
# Get missing time ranges that need to be downloaded
missing_ranges = await db_manager.get_missing_time_ranges(
symbol, interval, start_date, end_date
)
if not missing_ranges:
self.logger.info(f"No missing data ranges for {symbol} {interval}")
return 0
self.logger.info(
f"Found {len(missing_ranges)} missing time range(s) for {symbol} {interval}"
)
# Download each missing range
total_new_records = 0
for idx, time_range in enumerate(missing_ranges, 1):
range_start = time_range['start']
range_end = time_range['end']
self.logger.info(
f"Downloading range {idx}/{len(missing_ranges)}: "
f"{range_start} to {range_end} for {symbol} {interval}"
)
records_in_range = await self._download_time_range(
symbol, interval, range_start, range_end
)
total_new_records += records_in_range
self.logger.info(
f"Downloaded {records_in_range} records for range {idx}/{len(missing_ranges)}"
)
return total_new_records
def _calculate_chunk_end(self, start: datetime, interval: str, chunk_size: int) -> datetime:
"""Calculate the end time for a data chunk based on interval"""
if interval.endswith('m'):
minutes = int(interval[:-1])
return start + timedelta(minutes=minutes * chunk_size)
elif interval.endswith('h'):
hours = int(interval[:-1])
return start + timedelta(hours=hours * chunk_size)
elif interval.endswith('d'):
days = int(interval[:-1])
return start + timedelta(days=days * chunk_size)
elif interval.endswith('w'):
weeks = int(interval[:-1])
return start + timedelta(weeks=weeks * chunk_size)
else:
# Default to minutes
return start + timedelta(minutes=chunk_size)
@staticmethod
def _rest_kline_to_ws_event(symbol: str, interval: str, kline_row: List[Any]) -> Dict[str, Any]:
"""
Convert REST get_historical_klines row (list) to a WebSocket-style kline event
that parse_kline_data expects.
"""
# Per Binance REST klines: index meanings
# 0 open time(ms),1 open,2 high,3 low,4 close,5 volume,
# 6 close time(ms),7 quote asset volume,8 number of trades,
# 9 taker buy base asset volume,10 taker buy quote asset volume,11 ignore
return {
"e": "kline",
"E": int(kline_row[6]), # event time (approx close time)
"s": symbol.upper(),
"k": {
"t": int(kline_row[0]),
"T": int(kline_row[6]),
"s": symbol.upper(),
"i": interval,
"f": None, # first trade id
"L": None, # last trade id
"o": str(kline_row[1]),
"c": str(kline_row[4]),
"h": str(kline_row[2]),
"l": str(kline_row[3]),
"v": str(kline_row[5]),
"n": int(kline_row[8]),
"x": True, # closed candle
"q": str(kline_row[7]),
"V": None, # taker buy base vol (optional)
"Q": None, # taker buy quote vol (optional)
"B": None # ignore
}
}
async def _get_historical_klines_async(
self, symbol: str, interval: str, start_ms: int, end_ms: int, limit: int
) -> List[List[Any]]:
"""
Run python-binance get_historical_klines in a worker thread to avoid blocking the event loop.
"""
def call():
return self.client.get_historical_klines(
symbol=symbol,
interval=interval,
start_str=start_ms,
end_str=end_ms,
limit=limit
)
return await asyncio.to_thread(call)
async def _download_time_range(
self, symbol: str, interval: str, start_date: datetime, end_date: datetime
) -> int:
"""
Download data for a specific time range (internal method).
Returns:
Number of records downloaded and inserted
"""
global config, db_manager
# Resolve batch size and retry policy, prefer config then env
chunk_size = int(config.get("collection", {}).get("bulk_chunk_size",
int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000"))))
max_retries = int(config.get("collection", {}).get("max_retries",
int(os.getenv("MAX_RETRIES", "3"))))
retry_delay = float(config.get("collection", {}).get("retry_delay", 1))
# Normalize time inputs
from datetime import time as dt_time
if isinstance(start_date, dt_time):
start_date = datetime.combine(datetime.now(timezone.utc).date(), start_date)
if isinstance(end_date, dt_time):
base_date = start_date.date() if isinstance(start_date, datetime) else datetime.now(timezone.utc).date()
end_date = datetime.combine(base_date, end_date)
if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc)
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)
# Binance API expects naive UTC timestamps (ms)
current_start = start_date.replace(tzinfo=timezone.utc)
end = end_date.replace(tzinfo=timezone.utc)
total_records = 0
consecutive_empty = 0
while current_start < end:
try:
# Calculate chunk end time based on interval
chunk_end = self._calculate_chunk_end(current_start, interval, chunk_size)
if chunk_end > end:
chunk_end = end
klines: Optional[List[List[Any]]] = None
# Try with retry policy; also handle rate-limit backoffs
for attempt in range(max_retries):
try:
klines = await self._get_historical_klines_async(
symbol=symbol,
interval=interval,
start_ms=int(current_start.timestamp() * 1000),
end_ms=int(chunk_end.timestamp() * 1000),
limit=chunk_size
)
break
except BinanceAPIException as e:
if e.code == -1003: # Rate limit
wait_time = retry_delay * (2 ** attempt)
self.logger.warning(f"Rate limit hit for {symbol} {interval}, waiting {wait_time}s")
await asyncio.sleep(wait_time)
else:
self.logger.error(f"Binance API exception for {symbol} {interval}: {e}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(retry_delay)
except Exception as e:
self.logger.warning(f"Attempt {attempt + 1}/{max_retries} failed for {symbol} {interval}: {e}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(retry_delay)
# No data returned; advance conservatively or terminate
if not klines or len(klines) == 0:
consecutive_empty += 1
# If multiple consecutive empty chunks, assume past available history
if consecutive_empty >= 2:
self.logger.info(f"No more data available for {symbol} {interval}; ending range loop")
break
# Otherwise, advance by one chunk and continue
current_start = chunk_end + timedelta(milliseconds=1)
await asyncio.sleep(0.05)
continue
# Reset empty counter on success
consecutive_empty = 0
# Parse and store klines
ohlcv_data: List[Dict[str, Any]] = []
for kline in klines:
try:
ws_event = self._rest_kline_to_ws_event(symbol, interval, kline)
parsed_data = parse_kline_data(ws_event)
ohlcv_data.append(parsed_data)
except Exception as e:
self.logger.error(f"Error parsing kline data: {e} | raw={kline!r}")
continue
# Batch insert to database
if ohlcv_data:
await db_manager.insert_ohlcv_batch(ohlcv_data)
total_records += len(ohlcv_data)
# Update progress
if symbol in self.download_progress and \
interval in self.download_progress[symbol].get("intervals", {}):
self.download_progress[symbol]["intervals"][interval]["records"] = total_records
self.logger.debug(
f"Stored {len(ohlcv_data)} {interval} candles for {symbol} (total: {total_records})"
)
# Update current_start based on last candle close time
if klines:
last_close_time_ms = int(klines[-1][6]) # close time ms
# Advance by 1 ms past last close to avoid duplicate fetch
current_start = datetime.utcfromtimestamp((last_close_time_ms + 1) / 1000).replace(tzinfo=timezone.utc)
else:
break
# Light delay to avoid hammering
await asyncio.sleep(0.05)
except asyncio.CancelledError:
self.logger.info(f"Download for {symbol} {interval} cancelled")
break
except Exception as e:
self.logger.error(f"Error collecting {interval} data for {symbol}: {e}", exc_info=True)
# Backoff before continuing or aborting loop
await asyncio.sleep(max(0.5, retry_delay))
# Decide: keep attempting next loop iteration to avoid a hard stop
# If persistent errors, the retries around REST will surface again
# Optional: could break here if desired, but continuing is safer for coverage
return total_records
# ---------------------------
# Technical indicators
# ---------------------------
async def _calculate_and_store_indicators(self, symbol: str, interval: str):
"""Calculate and store technical indicators for a symbol and interval"""
try:
# Check if indicators are enabled for this interval
indicator_config = config.get('technical_indicators', {})
calc_intervals = indicator_config.get('calculation_intervals',
['1m', '5m', '15m', '1h', '4h', '1d'])
if interval not in calc_intervals:
self.logger.debug(
f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)"
)
return
# Get OHLCV data from database (need enough for longest indicator period)
max_period = 200 # SMA-200, etc.
ohlcv_data = await db_manager.get_ohlcv_data(symbol, interval, limit=max_period + 50)
if len(ohlcv_data) < 50:
self.logger.warning(
f"Not enough data for indicators: {symbol} {interval} ({len(ohlcv_data)} records)"
)
return
# Convert to DataFrame
df = pd.DataFrame(ohlcv_data)
df['time'] = pd.to_datetime(df['time'])
df = df.sort_values('time')
df.set_index('time', inplace=True)
# Rename columns for pandas_ta
df = df.rename(columns={
'open_price': 'open',
'high_price': 'high',
'low_price': 'low',
'close_price': 'close'
})
# Calculate technical indicators
indicators_data = calculate_technical_indicators(df, indicator_config)
# Store indicators in database
if indicators_data:
await db_manager.insert_indicators_batch(symbol, interval, indicators_data)
self.logger.info(
f"Stored {len(indicators_data)} indicator values for {symbol} {interval}"
)
except asyncio.CancelledError:
self.logger.info(f"Indicator calculation cancelled for {symbol} {interval}")
except Exception as e:
self.logger.error(
f"Error calculating indicators for {symbol} {interval}: {e}",
exc_info=True
)
async def _ensure_indicator_coverage(self, symbol: str, interval: str):
"""
Ensure 100% technical indicator coverage for a symbol/interval.
Checks coverage and backfills if needed.
"""
try:
indicator_config = config.get('technical_indicators', {})
# Check if 100% coverage enforcement is enabled
if not indicator_config.get('ensure_100_percent_coverage', True):
return
# Check current coverage
coverage = await db_manager.check_indicator_coverage(symbol, interval)
if coverage['coverage_percent'] < 99.9:
self.logger.warning(
f"Indicator coverage for {symbol} {interval}: {coverage['coverage_percent']:.2f}% - backfilling..."
)
# Backfill missing indicators
batch_size = indicator_config.get('backfill_batch_size', 200)
backfill_result = await db_manager.backfill_missing_indicators(
symbol, interval, batch_size=batch_size
)
if backfill_result['status'] == 'success':
self.logger.info(
f"Indicator backfill complete for {symbol} {interval}: "
f"{backfill_result['coverage_before']:.2f}% → {backfill_result['coverage_after']:.2f}% "
f"({backfill_result['indicators_added']} indicators added)"
)
else:
self.logger.error(
f"Indicator backfill failed for {symbol} {interval}: {backfill_result.get('error')}"
)
else:
self.logger.info(
f"Indicator coverage for {symbol} {interval}: {coverage['coverage_percent']:.2f}% ✓"
)
except Exception as e:
self.logger.error(
f"Error ensuring indicator coverage for {symbol} {interval}: {e}",
exc_info=True
)
async def start_indicator_coverage_monitor(self):
"""
Background task to periodically check and ensure 100% indicator coverage
for all symbol/interval combinations.
"""
global config
indicator_config = config.get('technical_indicators', {})
if not indicator_config.get('ensure_100_percent_coverage', True):
self.logger.info("Indicator coverage monitoring is disabled")
return
check_interval_hours = indicator_config.get('coverage_check_interval_hours', 6)
self.logger.info(
f"Starting indicator coverage monitor (every {check_interval_hours} hours)"
)
while self.is_collecting:
try:
# Get all enabled pairs
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
intervals = config.get('collection', {}).get('candle_intervals',
['1m', '5m', '15m', '1h', '4h', '1d'])
self.logger.info("Running scheduled indicator coverage check...")
for pair in enabled_pairs:
symbol = pair['symbol']
for interval in intervals:
try:
await self._ensure_indicator_coverage(symbol, interval)
# Small delay between checks
await asyncio.sleep(1)
except Exception as e:
self.logger.error(
f"Error checking coverage for {symbol} {interval}: {e}"
)
self.logger.info(
f"Indicator coverage check complete. Next check in {check_interval_hours} hours"
)
# Wait for next scheduled run
await asyncio.sleep(check_interval_hours * 3600)
except asyncio.CancelledError:
self.logger.info("Indicator coverage monitor cancelled")
break
except Exception as e:
self.logger.error(f"Error in indicator coverage monitor: {e}", exc_info=True)
await asyncio.sleep(3600) # Wait 1 hour on error
# ---------------------------
# Gap detection and filling
# ---------------------------
async def auto_fill_gaps(
self,
symbol: str,
intervals: Optional[List[str]] = None,
fill_genuine_gaps: bool = True
) -> Dict[str, Any]:
"""
Automatically fill gaps for a symbol
Args:
symbol: Trading pair symbol
intervals: List of intervals to fill (default: from config)
fill_genuine_gaps: Whether to fill genuine empty gaps with averages
Returns:
Dictionary with fill results
"""
# Acquire semaphore to limit concurrent gap fills
async with self._gap_fill_semaphore:
global config, db_manager
if intervals is None:
intervals = config.get('gap_filling', {}).get('intervals_to_monitor',
['1m', '5m', '15m', '1h', '4h', '1d'])
self.logger.info(f"Starting auto gap fill for {symbol} on intervals: {intervals}")
results: Dict[str, Any] = {
'symbol': symbol,
'intervals': {},
'total_gaps_filled': 0,
'total_genuine_filled': 0
}
try:
# Get record_from_date for this symbol
pair_config = next((p for p in config['trading_pairs'] if p['symbol'] == symbol), None)
if not pair_config:
self.logger.warning(f"Symbol {symbol} not found in config")
return results
record_from_date_iso = pair_config.get('record_from_date') or \
config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z')
_ = datetime.fromisoformat(record_from_date_iso.replace('Z', '+00:00')) # reserved
gap_config = config.get('gap_filling', {})
max_gap_size = gap_config.get('max_gap_size_candles', 1000)
max_attempts = int(gap_config.get('max_fill_attempts',
int(config.get("collection", {}).get("max_retries",
int(os.getenv("MAX_RETRIES", "3"))))))
averaging_lookback = gap_config.get('averaging_lookback_candles', 10)
max_empty_seq = gap_config.get('max_consecutive_empty_candles', 5)
for interval in intervals:
self.logger.info(f"Checking gaps for {symbol} {interval}")
# Detect gaps
gaps_info = await db_manager.detect_gaps(symbol, interval)
gaps = gaps_info.get('gaps', [])
interval_result = {
'gaps_found': len(gaps),
'gaps_filled': 0,
'genuine_filled': 0,
'errors': []
}
for gap in gaps:
missing_candles = gap['missing_candles']
# Skip if gap is too large
if missing_candles > max_gap_size:
self.logger.info(f"Skipping large gap: {missing_candles} candles")
interval_result['errors'].append(f"Gap too large: {missing_candles} candles")
continue
try:
gap_start = datetime.fromisoformat(gap['gap_start'])
gap_end = datetime.fromisoformat(gap['gap_end'])
self.logger.info(f"Filling gap: {gap_start} to {gap_end}")
# Attempt multiple real fills before resorting to averaging
real_filled_records = 0
for attempt in range(1, max_attempts + 1):
try:
# Small buffer around the gap to ensure edges are covered
buffered_start = gap_start - timedelta(milliseconds=1)
buffered_end = gap_end + timedelta(milliseconds=1)
added = await self._collect_historical_klines(
symbol, interval, buffered_start, buffered_end
)
real_filled_records += added
if added > 0:
# A successful fill; break early
break
except Exception as e:
# Log and continue attempts
interval_result['errors'].append(
f"Attempt {attempt} failed for gap {gap_start}->{gap_end}: {e}"
)
await asyncio.sleep(0.5 * attempt)
if real_filled_records > 0:
interval_result['gaps_filled'] += 1
results['total_gaps_filled'] += 1
self.logger.info(f"Successfully filled gap with {real_filled_records} records")
else:
# Genuine empty gap - fill with averages if enabled
if fill_genuine_gaps and gap_config.get('enable_intelligent_averaging', True):
filled = await db_manager.fill_genuine_gaps_with_averages(
symbol, interval,
max_empty_seq,
averaging_lookback
)
interval_result['genuine_filled'] += filled
results['total_genuine_filled'] += filled
# Small delay between gaps
await asyncio.sleep(0.2)
except Exception as e:
error_msg = f"Error filling gap: {str(e)}"
self.logger.error(error_msg)
interval_result['errors'].append(error_msg)
results['intervals'][interval] = interval_result
# Calculate and store indicators after any fills, then ensure coverage
if interval_result['gaps_filled'] > 0 or interval_result['genuine_filled'] > 0:
try:
await self._calculate_and_store_indicators(symbol, interval)
await self._ensure_indicator_coverage(symbol, interval)
except Exception as e:
self.logger.error(f"Error calculating indicators: {e}", exc_info=True)
self.logger.info(f"Auto gap fill completed for {symbol}: {results}")
return results
except Exception as e:
self.logger.error(f"Error in auto gap fill: {e}", exc_info=True)
results['error'] = str(e)
return results
async def start_auto_gap_fill_scheduler(self):
"""Start background task for automatic gap filling"""
global config
gap_config = config.get('gap_filling', {})
if not gap_config.get('enable_auto_gap_filling', False):
self.logger.info("Auto gap filling is disabled")
return
schedule_hours = gap_config.get('auto_fill_schedule_hours', 24)
self.logger.info(f"Starting auto gap fill scheduler (every {schedule_hours} hours)")
self.is_collecting = True
while self.is_collecting:
try:
# Get all enabled pairs
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
for pair in enabled_pairs:
symbol = pair['symbol']
self.logger.info(f"Running scheduled gap fill for {symbol}")
try:
await self.auto_fill_gaps(
symbol,
intervals=gap_config.get('intervals_to_monitor'),
fill_genuine_gaps=gap_config.get('enable_intelligent_averaging', True)
)
except Exception as e:
self.logger.error(f"Error in scheduled gap fill for {symbol}: {e}")
# Wait for next scheduled run
self.logger.info(f"Next auto gap fill in {schedule_hours} hours")
await asyncio.sleep(schedule_hours * 3600)
except asyncio.CancelledError:
self.logger.info("Auto gap fill scheduler cancelled")
break
except Exception as e:
self.logger.error(f"Error in auto gap fill scheduler: {e}", exc_info=True)
await asyncio.sleep(3600) # Wait 1 hour on error
# ---------------------------
# WebSocket continuous streams
# ---------------------------
async def start_continuous_collection(self):
"""Start continuous data collection via WebSocket"""
if self.websocket_collection_running:
self.logger.warning("WebSocket collection already running")
return
self.logger.info("Starting continuous WebSocket data collection")
self.websocket_collection_running = True
self.is_collecting = True
# Create WebSocket tasks for each enabled trading pair
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
if not enabled_pairs:
self.logger.warning("No enabled trading pairs found")
return
for pair_config in enabled_pairs:
symbol = pair_config['symbol'].lower()
# Start kline streams for configured intervals
for interval in config['collection']['candle_intervals']:
task_name = f"kline_{symbol}_{interval}"
task = asyncio.create_task(
self._websocket_kline_stream(symbol, interval),
name=task_name
)
running_tasks[task_name] = task
# Start trade stream for tick data
task_name = f"trade_{symbol}"
task = asyncio.create_task(
self._websocket_trade_stream(symbol),
name=task_name
)
running_tasks[task_name] = task
# Start auto gap fill scheduler
task_name = "auto_gap_fill_scheduler"
task = asyncio.create_task(
self.start_auto_gap_fill_scheduler(),
name=task_name
)
running_tasks[task_name] = task
# Start indicator coverage monitor
task_name = "indicator_coverage_monitor"
task = asyncio.create_task(
self.start_indicator_coverage_monitor(),
name=task_name
)
running_tasks[task_name] = task
self.logger.info(f"Started {len(running_tasks)} tasks including gap fill scheduler and indicator coverage monitor")
async def _websocket_kline_stream(self, symbol: str, interval: str):
"""WebSocket stream for kline/candlestick data"""
stream_name = f"{symbol}@kline_{interval}"
uri = f"wss://stream.binance.com:9443/ws/{stream_name}"
reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5)
ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20))
ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60))
while self.websocket_collection_running:
try:
async with websockets.connect(
uri,
ping_interval=ping_interval,
ping_timeout=ping_timeout
) as websocket:
self.logger.info(f"Connected to {stream_name}")
websocket_connections[stream_name] = websocket
async for message in websocket:
if not self.websocket_collection_running:
break
try:
data = json.loads(message)
# Validate event type and payload shape
if data.get('e') != 'kline' or 'k' not in data:
self.logger.debug(f"Ignored non-kline or malformed message on {stream_name}")
continue
# Parse kline data
ohlcv_data = parse_kline_data(data)
# Store in database
await db_manager.insert_ohlcv_single(ohlcv_data)
# Calculate indicators if kline is closed
if data['k'].get('x'):
await self._calculate_and_store_indicators(
symbol.upper(), interval
)
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON from {stream_name}")
except asyncio.CancelledError:
self.logger.info(f"Kline stream cancelled: {stream_name}")
break
except Exception as e:
self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True)
except websockets.exceptions.ConnectionClosed as e:
self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}")
except asyncio.CancelledError:
self.logger.info(f"Kline WebSocket cancelled for {stream_name}")
break
except Exception as e:
self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True)
finally:
# Clean up
if stream_name in websocket_connections:
websocket_connections.pop(stream_name, None)
if self.websocket_collection_running:
self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...")
await asyncio.sleep(reconnect_delay)
async def _websocket_trade_stream(self, symbol: str):
"""WebSocket stream for trade/tick data"""
stream_name = f"{symbol}@trade"
uri = f"wss://stream.binance.com:9443/ws/{stream_name}"
reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5)
ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20))
ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60))
while self.websocket_collection_running:
try:
async with websockets.connect(
uri,
ping_interval=ping_interval,
ping_timeout=ping_timeout
) as websocket:
self.logger.info(f"Connected to {stream_name}")
websocket_connections[stream_name] = websocket
tick_batch: List[Dict[str, Any]] = []
batch_size = int(config.get('collection', {}).get(
'tick_batch_size',
int(os.getenv("TICK_BATCH_SIZE", "100"))
))
async for message in websocket:
if not self.websocket_collection_running:
break
try:
data = json.loads(message)
if data.get('e') == 'trade':
# Parse trade data
tick_data = parse_trade_data(data)
tick_batch.append(tick_data)
# Batch insert when batch is full
if len(tick_batch) >= batch_size:
await db_manager.insert_ticks_batch(tick_batch)
tick_batch = []
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON from {stream_name}")
except asyncio.CancelledError:
self.logger.info(f"Trade stream cancelled: {stream_name}")
break
except Exception as e:
self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True)
# Insert remaining ticks
if tick_batch:
await db_manager.insert_ticks_batch(tick_batch)
except websockets.exceptions.ConnectionClosed as e:
self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}")
except asyncio.CancelledError:
self.logger.info(f"Trade WebSocket cancelled for {stream_name}")
break
except Exception as e:
self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True)
finally:
# Clean up
if stream_name in websocket_connections:
websocket_connections.pop(stream_name, None)
if self.websocket_collection_running:
self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...")
await asyncio.sleep(reconnect_delay)
async def stop_continuous_collection(self):
"""Stop continuous data collection"""
if not self.websocket_collection_running:
self.logger.warning("WebSocket collection not running")
return
self.logger.info("Stopping continuous data collection")
self.websocket_collection_running = False
self.is_collecting = False
# Cancel all running tasks
for task_name, task in list(running_tasks.items()):
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
self.logger.info(f"Cancelled task: {task_name}")
except Exception as e:
self.logger.error(f"Error cancelling task {task_name}: {e}")
# Close WebSocket connections
for conn_name, conn in list(websocket_connections.items()):
try:
await conn.close()
self.logger.info(f"Closed WebSocket: {conn_name}")
except Exception as e:
self.logger.error(f"Error closing WebSocket {conn_name}: {e}")
running_tasks.clear()
websocket_connections.clear()
self.logger.info("Continuous data collection stopped")
# ---------------------------
# Candle generation from ticks
# ---------------------------
async def generate_candles_from_ticks(
self,
symbol: str,
interval: str,
start_time: datetime,
end_time: datetime
):
"""
Generate OHLCV candles from tick data
Args:
symbol: Trading pair symbol
interval: Candle interval (e.g., '1m', '5m', '1h')
start_time: Start time for candle generation
end_time: End time for candle generation
"""
self.logger.info(f"Generating {interval} candles from ticks for {symbol}")
# Get tick data from database
ticks = await db_manager.get_tick_data(symbol, start_time, end_time)
if not ticks:
self.logger.warning(f"No tick data found for {symbol}")
return
# Convert to DataFrame
df = pd.DataFrame(ticks)
df['time'] = pd.to_datetime(df['time'])
df.set_index('time', inplace=True)
# Resample to create OHLCV data
ohlcv = df['price'].resample(interval).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
})
volume = df['quantity'].resample(interval).sum()
trade_count = df.resample(interval).size()
# Combine data
candles: List[Dict[str, Any]] = []
for timestamp, row in ohlcv.iterrows():
if pd.notna(row['open']): # Skip empty periods
candle = {
'time': timestamp,
'symbol': symbol,
'exchange': 'binance',
'interval': interval,
'open_price': float(row['open']),
'high_price': float(row['high']),
'low_price': float(row['low']),
'close_price': float(row['close']),
'volume': float(volume.loc[timestamp]) if timestamp in volume.index else 0.0,
'quote_volume': None,
'trade_count': int(trade_count.loc[timestamp]) if timestamp in trade_count.index else 0
}
candles.append(candle)
# Store candles in database
if candles:
await db_manager.insert_ohlcv_batch(candles)
self.logger.info(f"Generated and stored {len(candles)} candles for {symbol} {interval}")
# Calculate technical indicators and ensure coverage
await self._calculate_and_store_indicators(symbol, interval)
await self._ensure_indicator_coverage(symbol, interval)
else:
self.logger.warning(f"No candles generated for {symbol} {interval}")
# ---------------------------
# Progress and cleanup
# ---------------------------
async def get_download_progress(self, symbol: str = None) -> Dict[str, Any]:
"""Get download progress for a symbol or all symbols"""
if symbol:
return self.download_progress.get(symbol, {'status': 'not_found'})
return self.download_progress
async def cleanup(self):
"""Clean up resources"""
await self.stop_continuous_collection()
# db_manager may have a close method; guard if absent
try:
if db_manager and hasattr(db_manager, "close"):
await db_manager.close()
except Exception as e:
self.logger.warning(f"Error closing database manager: {e}")
self.logger.info("BinanceDataCollector cleanup complete")
# ---------------------------
# UI process management
# ---------------------------
def start_ui_server():
"""Start the UI server as a subprocess"""
global ui_process
logger = logging.getLogger(__name__)
try:
# Get UI configuration from environment
host = os.getenv("WEB_HOST", "0.0.0.0")
port = os.getenv("WEB_PORT", "8000")
logger.info(f"Starting UI server on {host}:{port}")
# Start ui.py as a subprocess
ui_process = subprocess.Popen(
[sys.executable, "ui.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
logger.info(f"✓ UI server started with PID: {ui_process.pid}")
# Start a thread to log UI output
import threading
def parse_log_level(line: str) -> int:
"""Parse the log level from a log line"""
# Check for standard Python logging format
if ' - ERROR - ' in line or 'ERROR:' in line:
return logging.ERROR
elif ' - WARNING - ' in line or 'WARNING:' in line or ' - WARN - ' in line:
return logging.WARNING
elif ' - DEBUG - ' in line or 'DEBUG:' in line:
return logging.DEBUG
else:
# Default to INFO for all other lines
return logging.INFO
def log_ui_output():
"""Log UI stdout messages"""
if not ui_process.stdout:
return
for line in ui_process.stdout:
line = line.rstrip()
if line:
log_level = parse_log_level(line)
logger.log(log_level, f"[UI] {line}")
def log_ui_stderr():
"""Log UI stderr messages with intelligent level detection"""
if not ui_process.stderr:
return
for line in ui_process.stderr:
line = line.rstrip()
if line:
log_level = parse_log_level(line)
logger.log(log_level, f"[UI] {line}")
stdout_thread = threading.Thread(target=log_ui_output, daemon=True)
stderr_thread = threading.Thread(target=log_ui_stderr, daemon=True)
stdout_thread.start()
stderr_thread.start()
except Exception as e:
logger.error(f"✗ Failed to start UI server: {e}")
raise
def stop_ui_server():
"""Stop the UI server subprocess"""
global ui_process
logger = logging.getLogger(__name__)
if ui_process:
try:
logger.info("Stopping UI server...")
ui_process.terminate()
try:
ui_process.wait(timeout=10)
logger.info("✓ UI server stopped gracefully")
except subprocess.TimeoutExpired:
logger.warning("UI server didn't stop gracefully, forcing...")
ui_process.kill()
ui_process.wait()
logger.info("✓ UI server forcefully stopped")
except Exception as e:
logger.error(f"✗ Error stopping UI server: {e}")
finally:
ui_process = None
else:
logger.debug("UI server process not running")
# ---------------------------
# Global signal handlers
# ---------------------------
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logger = logging.getLogger(__name__)
logger.info(f"Received signal {signum}, initiating shutdown...")
# Stop UI server
stop_ui_server()
# Cancel all running tasks
for task in asyncio.all_tasks():
task.cancel()
# ---------------------------
# Main entry point
# ---------------------------
async def main():
"""Main application entry point"""
# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
collector = BinanceDataCollector()
try:
# Initialize the collector
await collector.initialize()
# Start UI server
start_ui_server()
# Optionally kick off an initial full backfill for all configured pairs
if config.get("collection", {}).get("initial_full_backfill", True):
# Launch as a background task so WebSockets can start immediately
task_name = "initial_full_backfill"
backfill_task = asyncio.create_task(collector.start_bulk_download_for_all_pairs(), name=task_name)
running_tasks[task_name] = backfill_task
# Start continuous collection (kline + trade streams) and gap scheduler
await collector.start_continuous_collection()
# Keep the application running
while collector.websocket_collection_running:
await asyncio.sleep(1)
except KeyboardInterrupt:
logging.getLogger(__name__).info("Received keyboard interrupt")
except asyncio.CancelledError:
logging.getLogger(__name__).info("Application cancelled")
except Exception as e:
logging.getLogger(__name__).error(f"Application error: {e}", exc_info=True)
finally:
# Clean shutdown
logging.getLogger(__name__).info("Initiating shutdown...")
# Stop UI server first
stop_ui_server()
# Then cleanup collector
await collector.cleanup()
logging.getLogger(__name__).info("Application shutdown complete")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutdown requested by user")
except Exception as e:
print(f"Fatal error: {e}")
sys.exit(1)