#!/usr/bin/env python3 """ main.py - Complete Binance Trading Data Collection System Main application entry point with async data collection, websocket handling, bulk backfill orchestration, periodic gap scans, and task management. """ import asyncio import logging import signal import sys import json import subprocess import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional, Any, Tuple from contextlib import asynccontextmanager import websockets import aiohttp # kept for future-proofing network ops from binance.client import Client from binance.exceptions import BinanceAPIException import pandas as pd import pandas_ta as ta from dotenv import load_dotenv # Import our modules from db import DatabaseManager from utils import ( load_config, setup_logging, parse_kline_data, parse_trade_data, calculate_technical_indicators, validate_symbol, format_timestamp ) # Load environment variables load_dotenv('variables.env') # Global variables db_manager: Optional[DatabaseManager] = None config: Dict[str, Any] = {} running_tasks: Dict[str, asyncio.Task] = {} websocket_connections: Dict[str, Any] = {} ui_process: Optional[subprocess.Popen] = None class BinanceDataCollector: """Main data collection orchestrator for Binance trading data""" def __init__(self): self.client: Optional[Client] = None self.logger = logging.getLogger(__name__) self.is_collecting = False self.websocket_collection_running = False self.download_progress: Dict[str, Any] = {} # Concurrency controls from env with sensible defaults max_downloads = int(os.getenv('MAX_CONCURRENT_DOWNLOADS', '3')) max_gap_fills = int(os.getenv('MAX_CONCURRENT_GAP_FILLS', '2')) self._download_semaphore = asyncio.Semaphore(max_downloads) self._gap_fill_semaphore = asyncio.Semaphore(max_gap_fills) self.logger.info( f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills" ) async def initialize(self): """Initialize the data collector""" global db_manager, config # Setup logging setup_logging() self.logger.info("Initializing Binance Data Collector") # Load configuration config = load_config() # Optional: add defaults for new flags coll = config.setdefault("collection", {}) coll.setdefault("default_record_from_date", "2020-01-01T00:00:00Z") coll.setdefault("initial_full_backfill", True) coll.setdefault("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"]) coll.setdefault("bulk_chunk_size", int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000"))) coll.setdefault("tick_batch_size", int(os.getenv("TICK_BATCH_SIZE", "100"))) coll.setdefault("max_retries", int(os.getenv("MAX_RETRIES", "3"))) coll.setdefault("retry_delay", 1) gap = config.setdefault("gap_filling", {}) gap.setdefault("enable_auto_gap_filling", True) gap.setdefault("auto_fill_schedule_hours", 24) gap.setdefault("enable_intelligent_averaging", True) gap.setdefault("max_fill_attempts", int(os.getenv("MAX_RETRIES", "3"))) gap.setdefault("intervals_to_monitor", coll.get("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"])) gap.setdefault("max_gap_size_candles", 1000) gap.setdefault("averaging_lookback_candles", 10) gap.setdefault("max_consecutive_empty_candles", 5) self.logger.info(f"Loaded configuration for {len(config['trading_pairs'])} trading pairs") # Initialize database db_manager = DatabaseManager() await db_manager.initialize() self.logger.info("Database initialized successfully") # Initialize Binance client (no API key needed for market data) api_key = os.getenv('BINANCE_API_KEY') secret_key = os.getenv('BINANCE_SECRET_KEY') if api_key and secret_key: self.client = Client(api_key, secret_key) self.logger.info("Binance client initialized with API credentials") else: self.client = Client() self.logger.info("Binance client initialized without API credentials (public data only)") # --------------------------- # Bulk backfill orchestration # --------------------------- async def start_bulk_download_for_all_pairs(self): """ Automatically launch full-history downloads for all enabled pairs, starting from record_from_date for each pair, across all configured intervals. """ global config enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] if not enabled_pairs: self.logger.warning("No enabled trading pairs found for bulk backfill") return # Sort by priority ascending (1 higher priority), then by symbol for stable order enabled_pairs.sort(key=lambda p: (int(p.get('priority', 1)), p.get('symbol', ''))) # Build tasks per symbol to respect MAX_CONCURRENT_DOWNLOADS at the symbol level tasks: List[asyncio.Task] = [] now_utc = datetime.now(timezone.utc) for pair in enabled_pairs: symbol = pair['symbol'].upper() start_iso = pair.get('record_from_date') or config["collection"]["default_record_from_date"] try: start_dt = datetime.fromisoformat(start_iso.replace("Z", "+00:00")) except Exception: self.logger.warning(f"Invalid record_from_date for {symbol}: {start_iso}, falling back to default") start_dt = datetime.fromisoformat(config["collection"]["default_record_from_date"].replace("Z", "+00:00")) # One task per symbol to execute all intervals concurrently for that symbol tasks.append(asyncio.create_task( self._bulk_download_symbol_all_intervals(symbol, start_dt, now_utc), name=f"bulk_{symbol}" )) # Execute with graceful progress logging self.logger.info(f"Launching bulk backfill for {len(tasks)} symbols...") results = await asyncio.gather(*tasks, return_exceptions=True) errors = [r for r in results if isinstance(r, Exception)] if errors: self.logger.error(f"Bulk backfill completed with {len(errors)} errors; see logs for details") else: self.logger.info("Bulk backfill completed successfully for all symbols") async def _bulk_download_symbol_all_intervals( self, symbol: str, start_date: datetime, end_date: Optional[datetime] = None ): """ Launch concurrent downloads of all configured intervals for one symbol, bounded by the download semaphore to control exchange load. """ global config async with self._download_semaphore: end_date = end_date or datetime.now(timezone.utc) # Ensure progress structure intervals = config.get("collection", {}).get("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"]) self.download_progress[symbol] = { "status": "running", "intervals": {iv: {"status": "pending", "records": 0} for iv in intervals}, "start_time": datetime.now(timezone.utc).isoformat() } # Spawn all intervals concurrently for this symbol self.logger.info(f"Starting concurrent bulk for {symbol} on {intervals}") interval_tasks = [ asyncio.create_task( self._bulk_download_one_interval(symbol, interval, start_date, end_date), name=f"bulk_{symbol}_{interval}" ) for interval in intervals ] results = await asyncio.gather(*interval_tasks, return_exceptions=True) # Mark final status if any(isinstance(r, Exception) for r in results): self.download_progress[symbol]["status"] = "error" self.download_progress[symbol]["error"] = "One or more intervals failed" else: self.download_progress[symbol]["status"] = "completed" self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() async def _bulk_download_one_interval( self, symbol: str, interval: str, start_date: datetime, end_date: datetime ): """Run the bulk downloader for a single symbol+interval and then compute indicators.""" # Update status sp = self.download_progress.setdefault(symbol, {"intervals": {}}) sp["intervals"].setdefault(interval, {"status": "pending", "records": 0}) sp["intervals"][interval]["status"] = "checking" records_count = await self._collect_historical_klines(symbol, interval, start_date, end_date) if records_count > 0: sp["intervals"][interval]["status"] = "calculating_indicators" sp["intervals"][interval]["records"] = records_count await self._calculate_and_store_indicators(symbol, interval) sp["intervals"][interval]["status"] = "completed" self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records") else: sp["intervals"][interval]["status"] = "skipped_complete" self.logger.info(f"Skipped {interval} for {symbol} - data already complete or no new records") # --------------------------- # Intelligent bulk downloader # --------------------------- async def bulk_download_historical_data( self, symbol: str, start_date: datetime, end_date: Optional[datetime] = None, intervals: Optional[List[str]] = None ): """ Bulk download historical OHLCV data from Binance with intelligent gap detection. Only downloads data that doesn't already exist in the database. Note: kept for API/UI compatibility; orchestration now prefers start_bulk_download_for_all_pairs. """ async with self._download_semaphore: if end_date is None: end_date = datetime.now(timezone.utc) # Ensure timezone awareness if start_date.tzinfo is None: start_date = start_date.replace(tzinfo=timezone.utc) if end_date.tzinfo is None: end_date = end_date.replace(tzinfo=timezone.utc) self.logger.info(f"Starting intelligent bulk download for {symbol} from {start_date} to {end_date}") # Get intervals if intervals is None: intervals = config.get("collection", {}).get("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"]) # Initialize progress tracking self.download_progress[symbol] = { "status": "running", "intervals": {}, "start_time": datetime.now(timezone.utc).isoformat() } for interval in intervals: self.download_progress[symbol]["intervals"][interval] = {"status": "pending", "records": 0} try: # Run intervals concurrently to improve throughput for one symbol tasks = [ asyncio.create_task(self._bulk_download_one_interval(symbol, interval, start_date, end_date), name=f"bulk_single_{symbol}_{interval}") for interval in intervals ] await asyncio.gather(*tasks) self.download_progress[symbol]["status"] = "completed" self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() except Exception as e: self.logger.error(f"Error in bulk download for {symbol}: {e}", exc_info=True) self.download_progress[symbol]["status"] = "error" self.download_progress[symbol]["error"] = str(e) raise async def _collect_historical_klines( self, symbol: str, interval: str, start_date: datetime, end_date: datetime ) -> int: """ Intelligently collect historical kline data, only downloading missing ranges. Returns: Number of NEW records collected (not including already existing data) """ global db_manager # Check if data already exists for this range coverage_check = await db_manager.check_data_exists_for_range( symbol, interval, start_date, end_date ) self.logger.info( f"Data coverage for {symbol} {interval}: " f"{coverage_check['coverage_percent']:.2f}% " f"({coverage_check['count']}/{coverage_check['expected_count']} records)" ) # If coverage is complete, skip download if coverage_check.get('is_complete'): self.logger.info( f"Skipping {symbol} {interval} - data already complete " f"({coverage_check['coverage_percent']:.2f}% coverage)" ) return 0 # Get missing time ranges that need to be downloaded missing_ranges = await db_manager.get_missing_time_ranges( symbol, interval, start_date, end_date ) if not missing_ranges: self.logger.info(f"No missing data ranges for {symbol} {interval}") return 0 self.logger.info( f"Found {len(missing_ranges)} missing time range(s) for {symbol} {interval}" ) # Download each missing range total_new_records = 0 for idx, time_range in enumerate(missing_ranges, 1): range_start = time_range['start'] range_end = time_range['end'] self.logger.info( f"Downloading range {idx}/{len(missing_ranges)}: " f"{range_start} to {range_end} for {symbol} {interval}" ) records_in_range = await self._download_time_range( symbol, interval, range_start, range_end ) total_new_records += records_in_range self.logger.info( f"Downloaded {records_in_range} records for range {idx}/{len(missing_ranges)}" ) return total_new_records def _calculate_chunk_end(self, start: datetime, interval: str, chunk_size: int) -> datetime: """Calculate the end time for a data chunk based on interval""" if interval.endswith('m'): minutes = int(interval[:-1]) return start + timedelta(minutes=minutes * chunk_size) elif interval.endswith('h'): hours = int(interval[:-1]) return start + timedelta(hours=hours * chunk_size) elif interval.endswith('d'): days = int(interval[:-1]) return start + timedelta(days=days * chunk_size) elif interval.endswith('w'): weeks = int(interval[:-1]) return start + timedelta(weeks=weeks * chunk_size) else: # Default to minutes return start + timedelta(minutes=chunk_size) @staticmethod def _rest_kline_to_ws_event(symbol: str, interval: str, kline_row: List[Any]) -> Dict[str, Any]: """ Convert REST get_historical_klines row (list) to a WebSocket-style kline event that parse_kline_data expects. """ # Per Binance REST klines: index meanings # 0 open time(ms),1 open,2 high,3 low,4 close,5 volume, # 6 close time(ms),7 quote asset volume,8 number of trades, # 9 taker buy base asset volume,10 taker buy quote asset volume,11 ignore return { "e": "kline", "E": int(kline_row[6]), # event time (approx close time) "s": symbol.upper(), "k": { "t": int(kline_row[0]), "T": int(kline_row[6]), "s": symbol.upper(), "i": interval, "f": None, # first trade id "L": None, # last trade id "o": str(kline_row[1]), "c": str(kline_row[4]), "h": str(kline_row[2]), "l": str(kline_row[3]), "v": str(kline_row[5]), "n": int(kline_row[8]), "x": True, # closed candle "q": str(kline_row[7]), "V": None, # taker buy base vol (optional) "Q": None, # taker buy quote vol (optional) "B": None # ignore } } async def _get_historical_klines_async( self, symbol: str, interval: str, start_ms: int, end_ms: int, limit: int ) -> List[List[Any]]: """ Run python-binance get_historical_klines in a worker thread to avoid blocking the event loop. """ def call(): return self.client.get_historical_klines( symbol=symbol, interval=interval, start_str=start_ms, end_str=end_ms, limit=limit ) return await asyncio.to_thread(call) async def _download_time_range( self, symbol: str, interval: str, start_date: datetime, end_date: datetime ) -> int: """ Download data for a specific time range (internal method). Returns: Number of records downloaded and inserted """ global config, db_manager # Resolve batch size and retry policy, prefer config then env chunk_size = int(config.get("collection", {}).get("bulk_chunk_size", int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000")))) max_retries = int(config.get("collection", {}).get("max_retries", int(os.getenv("MAX_RETRIES", "3")))) retry_delay = float(config.get("collection", {}).get("retry_delay", 1)) # Normalize time inputs from datetime import time as dt_time if isinstance(start_date, dt_time): start_date = datetime.combine(datetime.now(timezone.utc).date(), start_date) if isinstance(end_date, dt_time): base_date = start_date.date() if isinstance(start_date, datetime) else datetime.now(timezone.utc).date() end_date = datetime.combine(base_date, end_date) if start_date.tzinfo is None: start_date = start_date.replace(tzinfo=timezone.utc) if end_date.tzinfo is None: end_date = end_date.replace(tzinfo=timezone.utc) # Binance API expects naive UTC timestamps (ms) current_start = start_date.replace(tzinfo=timezone.utc) end = end_date.replace(tzinfo=timezone.utc) total_records = 0 consecutive_empty = 0 while current_start < end: try: # Calculate chunk end time based on interval chunk_end = self._calculate_chunk_end(current_start, interval, chunk_size) if chunk_end > end: chunk_end = end klines: Optional[List[List[Any]]] = None # Try with retry policy; also handle rate-limit backoffs for attempt in range(max_retries): try: klines = await self._get_historical_klines_async( symbol=symbol, interval=interval, start_ms=int(current_start.timestamp() * 1000), end_ms=int(chunk_end.timestamp() * 1000), limit=chunk_size ) break except BinanceAPIException as e: if e.code == -1003: # Rate limit wait_time = retry_delay * (2 ** attempt) self.logger.warning(f"Rate limit hit for {symbol} {interval}, waiting {wait_time}s") await asyncio.sleep(wait_time) else: self.logger.error(f"Binance API exception for {symbol} {interval}: {e}") if attempt == max_retries - 1: raise await asyncio.sleep(retry_delay) except Exception as e: self.logger.warning(f"Attempt {attempt + 1}/{max_retries} failed for {symbol} {interval}: {e}") if attempt == max_retries - 1: raise await asyncio.sleep(retry_delay) # No data returned; advance conservatively or terminate if not klines or len(klines) == 0: consecutive_empty += 1 # If multiple consecutive empty chunks, assume past available history if consecutive_empty >= 2: self.logger.info(f"No more data available for {symbol} {interval}; ending range loop") break # Otherwise, advance by one chunk and continue current_start = chunk_end + timedelta(milliseconds=1) await asyncio.sleep(0.05) continue # Reset empty counter on success consecutive_empty = 0 # Parse and store klines ohlcv_data: List[Dict[str, Any]] = [] for kline in klines: try: ws_event = self._rest_kline_to_ws_event(symbol, interval, kline) parsed_data = parse_kline_data(ws_event) ohlcv_data.append(parsed_data) except Exception as e: self.logger.error(f"Error parsing kline data: {e} | raw={kline!r}") continue # Batch insert to database if ohlcv_data: await db_manager.insert_ohlcv_batch(ohlcv_data) total_records += len(ohlcv_data) # Update progress if symbol in self.download_progress and \ interval in self.download_progress[symbol].get("intervals", {}): self.download_progress[symbol]["intervals"][interval]["records"] = total_records self.logger.debug( f"Stored {len(ohlcv_data)} {interval} candles for {symbol} (total: {total_records})" ) # Update current_start based on last candle close time if klines: last_close_time_ms = int(klines[-1][6]) # close time ms # Advance by 1 ms past last close to avoid duplicate fetch current_start = datetime.utcfromtimestamp((last_close_time_ms + 1) / 1000).replace(tzinfo=timezone.utc) else: break # Light delay to avoid hammering await asyncio.sleep(0.05) except asyncio.CancelledError: self.logger.info(f"Download for {symbol} {interval} cancelled") break except Exception as e: self.logger.error(f"Error collecting {interval} data for {symbol}: {e}", exc_info=True) # Backoff before continuing or aborting loop await asyncio.sleep(max(0.5, retry_delay)) # Decide: keep attempting next loop iteration to avoid a hard stop # If persistent errors, the retries around REST will surface again # Optional: could break here if desired, but continuing is safer for coverage return total_records # --------------------------- # Technical indicators # --------------------------- async def _calculate_and_store_indicators(self, symbol: str, interval: str): """Calculate and store technical indicators for a symbol and interval""" try: # Check if indicators are enabled for this interval indicator_config = config.get('technical_indicators', {}) calc_intervals = indicator_config.get('calculation_intervals', ['1m', '5m', '15m', '1h', '4h', '1d']) if interval not in calc_intervals: self.logger.debug( f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)" ) return # Get OHLCV data from database (need enough for longest indicator period) max_period = 200 # SMA-200, etc. ohlcv_data = await db_manager.get_ohlcv_data(symbol, interval, limit=max_period + 50) if len(ohlcv_data) < 50: self.logger.warning( f"Not enough data for indicators: {symbol} {interval} ({len(ohlcv_data)} records)" ) return # Convert to DataFrame df = pd.DataFrame(ohlcv_data) df['time'] = pd.to_datetime(df['time']) df = df.sort_values('time') df.set_index('time', inplace=True) # Rename columns for pandas_ta df = df.rename(columns={ 'open_price': 'open', 'high_price': 'high', 'low_price': 'low', 'close_price': 'close' }) # Calculate technical indicators indicators_data = calculate_technical_indicators(df, indicator_config) # Store indicators in database if indicators_data: await db_manager.insert_indicators_batch(symbol, interval, indicators_data) self.logger.info( f"Stored {len(indicators_data)} indicator values for {symbol} {interval}" ) except asyncio.CancelledError: self.logger.info(f"Indicator calculation cancelled for {symbol} {interval}") except Exception as e: self.logger.error( f"Error calculating indicators for {symbol} {interval}: {e}", exc_info=True ) # --------------------------- # Gap detection and filling # --------------------------- async def auto_fill_gaps( self, symbol: str, intervals: Optional[List[str]] = None, fill_genuine_gaps: bool = True ) -> Dict[str, Any]: """ Automatically fill gaps for a symbol Args: symbol: Trading pair symbol intervals: List of intervals to fill (default: from config) fill_genuine_gaps: Whether to fill genuine empty gaps with averages Returns: Dictionary with fill results """ # Acquire semaphore to limit concurrent gap fills async with self._gap_fill_semaphore: global config, db_manager if intervals is None: intervals = config.get('gap_filling', {}).get('intervals_to_monitor', ['1m', '5m', '15m', '1h', '4h', '1d']) self.logger.info(f"Starting auto gap fill for {symbol} on intervals: {intervals}") results: Dict[str, Any] = { 'symbol': symbol, 'intervals': {}, 'total_gaps_filled': 0, 'total_genuine_filled': 0 } try: # Get record_from_date for this symbol pair_config = next((p for p in config['trading_pairs'] if p['symbol'] == symbol), None) if not pair_config: self.logger.warning(f"Symbol {symbol} not found in config") return results record_from_date_iso = pair_config.get('record_from_date') or \ config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') _ = datetime.fromisoformat(record_from_date_iso.replace('Z', '+00:00')) # reserved gap_config = config.get('gap_filling', {}) max_gap_size = gap_config.get('max_gap_size_candles', 1000) max_attempts = int(gap_config.get('max_fill_attempts', int(config.get("collection", {}).get("max_retries", int(os.getenv("MAX_RETRIES", "3")))))) averaging_lookback = gap_config.get('averaging_lookback_candles', 10) max_empty_seq = gap_config.get('max_consecutive_empty_candles', 5) for interval in intervals: self.logger.info(f"Checking gaps for {symbol} {interval}") # Detect gaps gaps_info = await db_manager.detect_gaps(symbol, interval) gaps = gaps_info.get('gaps', []) interval_result = { 'gaps_found': len(gaps), 'gaps_filled': 0, 'genuine_filled': 0, 'errors': [] } for gap in gaps: missing_candles = gap['missing_candles'] # Skip if gap is too large if missing_candles > max_gap_size: self.logger.info(f"Skipping large gap: {missing_candles} candles") interval_result['errors'].append(f"Gap too large: {missing_candles} candles") continue try: gap_start = datetime.fromisoformat(gap['gap_start']) gap_end = datetime.fromisoformat(gap['gap_end']) self.logger.info(f"Filling gap: {gap_start} to {gap_end}") # Attempt multiple real fills before resorting to averaging real_filled_records = 0 for attempt in range(1, max_attempts + 1): try: # Small buffer around the gap to ensure edges are covered buffered_start = gap_start - timedelta(milliseconds=1) buffered_end = gap_end + timedelta(milliseconds=1) added = await self._collect_historical_klines( symbol, interval, buffered_start, buffered_end ) real_filled_records += added if added > 0: # A successful fill; break early break except Exception as e: # Log and continue attempts interval_result['errors'].append( f"Attempt {attempt} failed for gap {gap_start}->{gap_end}: {e}" ) await asyncio.sleep(0.5 * attempt) if real_filled_records > 0: interval_result['gaps_filled'] += 1 results['total_gaps_filled'] += 1 self.logger.info(f"Successfully filled gap with {real_filled_records} records") else: # Genuine empty gap - fill with averages if enabled if fill_genuine_gaps and gap_config.get('enable_intelligent_averaging', True): filled = await db_manager.fill_genuine_gaps_with_averages( symbol, interval, max_empty_seq, averaging_lookback ) interval_result['genuine_filled'] += filled results['total_genuine_filled'] += filled # Small delay between gaps await asyncio.sleep(0.2) except Exception as e: error_msg = f"Error filling gap: {str(e)}" self.logger.error(error_msg) interval_result['errors'].append(error_msg) results['intervals'][interval] = interval_result # Calculate and store indicators after any fills if interval_result['gaps_filled'] > 0 or interval_result['genuine_filled'] > 0: try: await self._calculate_and_store_indicators(symbol, interval) except Exception as e: self.logger.error(f"Error calculating indicators: {e}", exc_info=True) self.logger.info(f"Auto gap fill completed for {symbol}: {results}") return results except Exception as e: self.logger.error(f"Error in auto gap fill: {e}", exc_info=True) results['error'] = str(e) return results async def start_auto_gap_fill_scheduler(self): """Start background task for automatic gap filling""" global config gap_config = config.get('gap_filling', {}) if not gap_config.get('enable_auto_gap_filling', False): self.logger.info("Auto gap filling is disabled") return schedule_hours = gap_config.get('auto_fill_schedule_hours', 24) self.logger.info(f"Starting auto gap fill scheduler (every {schedule_hours} hours)") self.is_collecting = True while self.is_collecting: try: # Get all enabled pairs enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] for pair in enabled_pairs: symbol = pair['symbol'] self.logger.info(f"Running scheduled gap fill for {symbol}") try: await self.auto_fill_gaps( symbol, intervals=gap_config.get('intervals_to_monitor'), fill_genuine_gaps=gap_config.get('enable_intelligent_averaging', True) ) except Exception as e: self.logger.error(f"Error in scheduled gap fill for {symbol}: {e}") # Wait for next scheduled run self.logger.info(f"Next auto gap fill in {schedule_hours} hours") await asyncio.sleep(schedule_hours * 3600) except asyncio.CancelledError: self.logger.info("Auto gap fill scheduler cancelled") break except Exception as e: self.logger.error(f"Error in auto gap fill scheduler: {e}", exc_info=True) await asyncio.sleep(3600) # Wait 1 hour on error # --------------------------- # WebSocket continuous streams # --------------------------- async def start_continuous_collection(self): """Start continuous data collection via WebSocket""" if self.websocket_collection_running: self.logger.warning("WebSocket collection already running") return self.logger.info("Starting continuous WebSocket data collection") self.websocket_collection_running = True self.is_collecting = True # Create WebSocket tasks for each enabled trading pair enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] if not enabled_pairs: self.logger.warning("No enabled trading pairs found") return for pair_config in enabled_pairs: symbol = pair_config['symbol'].lower() # Start kline streams for configured intervals for interval in config['collection']['candle_intervals']: task_name = f"kline_{symbol}_{interval}" task = asyncio.create_task( self._websocket_kline_stream(symbol, interval), name=task_name ) running_tasks[task_name] = task # Start trade stream for tick data task_name = f"trade_{symbol}" task = asyncio.create_task( self._websocket_trade_stream(symbol), name=task_name ) running_tasks[task_name] = task # Start auto gap fill scheduler task_name = "auto_gap_fill_scheduler" task = asyncio.create_task( self.start_auto_gap_fill_scheduler(), name=task_name ) running_tasks[task_name] = task self.logger.info(f"Started {len(running_tasks)} tasks including gap fill scheduler") async def _websocket_kline_stream(self, symbol: str, interval: str): """WebSocket stream for kline/candlestick data""" stream_name = f"{symbol}@kline_{interval}" uri = f"wss://stream.binance.com:9443/ws/{stream_name}" reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5) ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20)) ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60)) while self.websocket_collection_running: try: async with websockets.connect( uri, ping_interval=ping_interval, ping_timeout=ping_timeout ) as websocket: self.logger.info(f"Connected to {stream_name}") websocket_connections[stream_name] = websocket async for message in websocket: if not self.websocket_collection_running: break try: data = json.loads(message) # Validate event type and payload shape if data.get('e') != 'kline' or 'k' not in data: self.logger.debug(f"Ignored non-kline or malformed message on {stream_name}") continue # Parse kline data ohlcv_data = parse_kline_data(data) # Store in database await db_manager.insert_ohlcv_single(ohlcv_data) # Calculate indicators if kline is closed if data['k'].get('x'): await self._calculate_and_store_indicators( symbol.upper(), interval ) except json.JSONDecodeError: self.logger.error(f"Invalid JSON from {stream_name}") except asyncio.CancelledError: self.logger.info(f"Kline stream cancelled: {stream_name}") break except Exception as e: self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True) except websockets.exceptions.ConnectionClosed as e: self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}") except asyncio.CancelledError: self.logger.info(f"Kline WebSocket cancelled for {stream_name}") break except Exception as e: self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True) finally: # Clean up if stream_name in websocket_connections: websocket_connections.pop(stream_name, None) if self.websocket_collection_running: self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...") await asyncio.sleep(reconnect_delay) async def _websocket_trade_stream(self, symbol: str): """WebSocket stream for trade/tick data""" stream_name = f"{symbol}@trade" uri = f"wss://stream.binance.com:9443/ws/{stream_name}" reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5) ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20)) ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60)) while self.websocket_collection_running: try: async with websockets.connect( uri, ping_interval=ping_interval, ping_timeout=ping_timeout ) as websocket: self.logger.info(f"Connected to {stream_name}") websocket_connections[stream_name] = websocket tick_batch: List[Dict[str, Any]] = [] batch_size = int(config.get('collection', {}).get( 'tick_batch_size', int(os.getenv("TICK_BATCH_SIZE", "100")) )) async for message in websocket: if not self.websocket_collection_running: break try: data = json.loads(message) if data.get('e') == 'trade': # Parse trade data tick_data = parse_trade_data(data) tick_batch.append(tick_data) # Batch insert when batch is full if len(tick_batch) >= batch_size: await db_manager.insert_ticks_batch(tick_batch) tick_batch = [] except json.JSONDecodeError: self.logger.error(f"Invalid JSON from {stream_name}") except asyncio.CancelledError: self.logger.info(f"Trade stream cancelled: {stream_name}") break except Exception as e: self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True) # Insert remaining ticks if tick_batch: await db_manager.insert_ticks_batch(tick_batch) except websockets.exceptions.ConnectionClosed as e: self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}") except asyncio.CancelledError: self.logger.info(f"Trade WebSocket cancelled for {stream_name}") break except Exception as e: self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True) finally: # Clean up if stream_name in websocket_connections: websocket_connections.pop(stream_name, None) if self.websocket_collection_running: self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...") await asyncio.sleep(reconnect_delay) async def stop_continuous_collection(self): """Stop continuous data collection""" if not self.websocket_collection_running: self.logger.warning("WebSocket collection not running") return self.logger.info("Stopping continuous data collection") self.websocket_collection_running = False self.is_collecting = False # Cancel all running tasks for task_name, task in list(running_tasks.items()): if not task.done(): task.cancel() try: await task except asyncio.CancelledError: self.logger.info(f"Cancelled task: {task_name}") except Exception as e: self.logger.error(f"Error cancelling task {task_name}: {e}") # Close WebSocket connections for conn_name, conn in list(websocket_connections.items()): try: await conn.close() self.logger.info(f"Closed WebSocket: {conn_name}") except Exception as e: self.logger.error(f"Error closing WebSocket {conn_name}: {e}") running_tasks.clear() websocket_connections.clear() self.logger.info("Continuous data collection stopped") # --------------------------- # Candle generation from ticks # --------------------------- async def generate_candles_from_ticks( self, symbol: str, interval: str, start_time: datetime, end_time: datetime ): """ Generate OHLCV candles from tick data Args: symbol: Trading pair symbol interval: Candle interval (e.g., '1m', '5m', '1h') start_time: Start time for candle generation end_time: End time for candle generation """ self.logger.info(f"Generating {interval} candles from ticks for {symbol}") # Get tick data from database ticks = await db_manager.get_tick_data(symbol, start_time, end_time) if not ticks: self.logger.warning(f"No tick data found for {symbol}") return # Convert to DataFrame df = pd.DataFrame(ticks) df['time'] = pd.to_datetime(df['time']) df.set_index('time', inplace=True) # Resample to create OHLCV data ohlcv = df['price'].resample(interval).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last' }) volume = df['quantity'].resample(interval).sum() trade_count = df.resample(interval).size() # Combine data candles: List[Dict[str, Any]] = [] for timestamp, row in ohlcv.iterrows(): if pd.notna(row['open']): # Skip empty periods candle = { 'time': timestamp, 'symbol': symbol, 'exchange': 'binance', 'interval': interval, 'open_price': float(row['open']), 'high_price': float(row['high']), 'low_price': float(row['low']), 'close_price': float(row['close']), 'volume': float(volume.loc[timestamp]) if timestamp in volume.index else 0.0, 'quote_volume': None, 'trade_count': int(trade_count.loc[timestamp]) if timestamp in trade_count.index else 0 } candles.append(candle) # Store candles in database if candles: await db_manager.insert_ohlcv_batch(candles) self.logger.info(f"Generated and stored {len(candles)} candles for {symbol} {interval}") # Calculate technical indicators await self._calculate_and_store_indicators(symbol, interval) else: self.logger.warning(f"No candles generated for {symbol} {interval}") # --------------------------- # Progress and cleanup # --------------------------- async def get_download_progress(self, symbol: str = None) -> Dict[str, Any]: """Get download progress for a symbol or all symbols""" if symbol: return self.download_progress.get(symbol, {'status': 'not_found'}) return self.download_progress async def cleanup(self): """Clean up resources""" await self.stop_continuous_collection() # db_manager may have a close method; guard if absent try: if db_manager and hasattr(db_manager, "close"): await db_manager.close() except Exception as e: self.logger.warning(f"Error closing database manager: {e}") self.logger.info("BinanceDataCollector cleanup complete") # --------------------------- # UI process management # --------------------------- def start_ui_server(): """Start the UI server as a subprocess""" global ui_process logger = logging.getLogger(__name__) try: # Get UI configuration from environment host = os.getenv("WEB_HOST", "0.0.0.0") port = os.getenv("WEB_PORT", "8000") logger.info(f"Starting UI server on {host}:{port}") # Start ui.py as a subprocess ui_process = subprocess.Popen( [sys.executable, "ui.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1 ) logger.info(f"✓ UI server started with PID: {ui_process.pid}") # Start a thread to log UI output import threading def parse_log_level(line: str) -> int: """Parse the log level from a log line""" # Check for standard Python logging format if ' - ERROR - ' in line or 'ERROR:' in line: return logging.ERROR elif ' - WARNING - ' in line or 'WARNING:' in line or ' - WARN - ' in line: return logging.WARNING elif ' - DEBUG - ' in line or 'DEBUG:' in line: return logging.DEBUG else: # Default to INFO for all other lines return logging.INFO def log_ui_output(): """Log UI stdout messages""" if not ui_process.stdout: return for line in ui_process.stdout: line = line.rstrip() if line: log_level = parse_log_level(line) logger.log(log_level, f"[UI] {line}") def log_ui_stderr(): """Log UI stderr messages with intelligent level detection""" if not ui_process.stderr: return for line in ui_process.stderr: line = line.rstrip() if line: log_level = parse_log_level(line) logger.log(log_level, f"[UI] {line}") stdout_thread = threading.Thread(target=log_ui_output, daemon=True) stderr_thread = threading.Thread(target=log_ui_stderr, daemon=True) stdout_thread.start() stderr_thread.start() except Exception as e: logger.error(f"✗ Failed to start UI server: {e}") raise def stop_ui_server(): """Stop the UI server subprocess""" global ui_process logger = logging.getLogger(__name__) if ui_process: try: logger.info("Stopping UI server...") ui_process.terminate() try: ui_process.wait(timeout=10) logger.info("✓ UI server stopped gracefully") except subprocess.TimeoutExpired: logger.warning("UI server didn't stop gracefully, forcing...") ui_process.kill() ui_process.wait() logger.info("✓ UI server forcefully stopped") except Exception as e: logger.error(f"✗ Error stopping UI server: {e}") finally: ui_process = None else: logger.debug("UI server process not running") # --------------------------- # Global signal handlers # --------------------------- def signal_handler(signum, frame): """Handle shutdown signals""" logger = logging.getLogger(__name__) logger.info(f"Received signal {signum}, initiating shutdown...") # Stop UI server stop_ui_server() # Cancel all running tasks for task in asyncio.all_tasks(): task.cancel() # --------------------------- # Main entry point # --------------------------- async def main(): """Main application entry point""" # Setup signal handlers signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) collector = BinanceDataCollector() try: # Initialize the collector await collector.initialize() # Start UI server start_ui_server() # Optionally kick off an initial full backfill for all configured pairs if config.get("collection", {}).get("initial_full_backfill", True): # Launch as a background task so WebSockets can start immediately task_name = "initial_full_backfill" backfill_task = asyncio.create_task(collector.start_bulk_download_for_all_pairs(), name=task_name) running_tasks[task_name] = backfill_task # Start continuous collection (kline + trade streams) and gap scheduler await collector.start_continuous_collection() # Keep the application running while collector.websocket_collection_running: await asyncio.sleep(1) except KeyboardInterrupt: logging.getLogger(__name__).info("Received keyboard interrupt") except asyncio.CancelledError: logging.getLogger(__name__).info("Application cancelled") except Exception as e: logging.getLogger(__name__).error(f"Application error: {e}", exc_info=True) finally: # Clean shutdown logging.getLogger(__name__).info("Initiating shutdown...") # Stop UI server first stop_ui_server() # Then cleanup collector await collector.cleanup() logging.getLogger(__name__).info("Application shutdown complete") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\nShutdown requested by user") except Exception as e: print(f"Fatal error: {e}") sys.exit(1)