From e891cd7c1518848f3607cc77ad3bdaa185550523 Mon Sep 17 00:00:00 2001 From: lewismac Date: Thu, 9 Oct 2025 08:51:09 +0100 Subject: [PATCH] feat: Integrate 100% indicator coverage enforcement into data collection - Add _ensure_indicator_coverage() to verify and backfill after data collection - Add start_indicator_coverage_monitor() background task for periodic checks - Configure coverage monitoring with ensure_100_percent_coverage flag - Set coverage_check_interval_hours (default: 6 hours) for monitoring frequency - Set backfill_batch_size (default: 200) for efficient backfilling - Call coverage check after bulk downloads, gap fills, and candle generation - Start indicator_coverage_monitor task in continuous collection mode - Log coverage percentages and backfill results for transparency - Ensure all OHLCV records have complete technical indicator coverage This integrates the new db.py indicator coverage methods into the main data collection workflow, ensuring 100% coverage is automatically maintained across all symbol/interval combinations. --- main.py | 253 +++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 225 insertions(+), 28 deletions(-) diff --git a/main.py b/main.py index 3fa7ce7..a74a489 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """ + main.py - Complete Binance Trading Data Collection System Main application entry point with async data collection, websocket handling, bulk @@ -18,9 +19,8 @@ import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional, Any, Tuple from contextlib import asynccontextmanager - import websockets -import aiohttp # kept for future-proofing network ops +import aiohttp # kept for future-proofing network ops from binance.client import Client from binance.exceptions import BinanceAPIException import pandas as pd @@ -34,6 +34,7 @@ from utils import ( calculate_technical_indicators, validate_symbol, format_timestamp ) + # Load environment variables load_dotenv('variables.env') @@ -60,7 +61,6 @@ class BinanceDataCollector: max_gap_fills = int(os.getenv('MAX_CONCURRENT_GAP_FILLS', '2')) self._download_semaphore = asyncio.Semaphore(max_downloads) self._gap_fill_semaphore = asyncio.Semaphore(max_gap_fills) - self.logger.info( f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills" ) @@ -71,7 +71,6 @@ class BinanceDataCollector: # Setup logging setup_logging() - self.logger.info("Initializing Binance Data Collector") # Load configuration @@ -97,6 +96,12 @@ class BinanceDataCollector: gap.setdefault("averaging_lookback_candles", 10) gap.setdefault("max_consecutive_empty_candles", 5) + # Add indicator coverage configuration + ind = config.setdefault("technical_indicators", {}) + ind.setdefault("ensure_100_percent_coverage", True) + ind.setdefault("coverage_check_interval_hours", 6) + ind.setdefault("backfill_batch_size", 200) + self.logger.info(f"Loaded configuration for {len(config['trading_pairs'])} trading pairs") # Initialize database @@ -107,6 +112,7 @@ class BinanceDataCollector: # Initialize Binance client (no API key needed for market data) api_key = os.getenv('BINANCE_API_KEY') secret_key = os.getenv('BINANCE_SECRET_KEY') + if api_key and secret_key: self.client = Client(api_key, secret_key) self.logger.info("Binance client initialized with API credentials") @@ -124,8 +130,8 @@ class BinanceDataCollector: starting from record_from_date for each pair, across all configured intervals. """ global config - enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] + if not enabled_pairs: self.logger.warning("No enabled trading pairs found for bulk backfill") return @@ -140,6 +146,7 @@ class BinanceDataCollector: for pair in enabled_pairs: symbol = pair['symbol'].upper() start_iso = pair.get('record_from_date') or config["collection"]["default_record_from_date"] + try: start_dt = datetime.fromisoformat(start_iso.replace("Z", "+00:00")) except Exception: @@ -155,6 +162,7 @@ class BinanceDataCollector: # Execute with graceful progress logging self.logger.info(f"Launching bulk backfill for {len(tasks)} symbols...") results = await asyncio.gather(*tasks, return_exceptions=True) + errors = [r for r in results if isinstance(r, Exception)] if errors: self.logger.error(f"Bulk backfill completed with {len(errors)} errors; see logs for details") @@ -172,6 +180,7 @@ class BinanceDataCollector: bounded by the download semaphore to control exchange load. """ global config + async with self._download_semaphore: end_date = end_date or datetime.now(timezone.utc) @@ -186,6 +195,7 @@ class BinanceDataCollector: # Spawn all intervals concurrently for this symbol self.logger.info(f"Starting concurrent bulk for {symbol} on {intervals}") + interval_tasks = [ asyncio.create_task( self._bulk_download_one_interval(symbol, interval, start_date, end_date), @@ -202,7 +212,7 @@ class BinanceDataCollector: self.download_progress[symbol]["error"] = "One or more intervals failed" else: self.download_progress[symbol]["status"] = "completed" - self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() + self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() async def _bulk_download_one_interval( self, @@ -218,10 +228,17 @@ class BinanceDataCollector: sp["intervals"][interval]["status"] = "checking" records_count = await self._collect_historical_klines(symbol, interval, start_date, end_date) + if records_count > 0: sp["intervals"][interval]["status"] = "calculating_indicators" sp["intervals"][interval]["records"] = records_count + + # Calculate indicators await self._calculate_and_store_indicators(symbol, interval) + + # Ensure 100% indicator coverage + await self._ensure_indicator_coverage(symbol, interval) + sp["intervals"][interval]["status"] = "completed" self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records") else: @@ -256,7 +273,7 @@ class BinanceDataCollector: # Get intervals if intervals is None: intervals = config.get("collection", {}).get("candle_intervals", - ["1m", "5m", "15m", "1h", "4h", "1d"]) + ["1m", "5m", "15m", "1h", "4h", "1d"]) # Initialize progress tracking self.download_progress[symbol] = { @@ -272,13 +289,14 @@ class BinanceDataCollector: # Run intervals concurrently to improve throughput for one symbol tasks = [ asyncio.create_task(self._bulk_download_one_interval(symbol, interval, start_date, end_date), - name=f"bulk_single_{symbol}_{interval}") + name=f"bulk_single_{symbol}_{interval}") for interval in intervals ] - await asyncio.gather(*tasks) + await asyncio.gather(*tasks) self.download_progress[symbol]["status"] = "completed" self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() + except Exception as e: self.logger.error(f"Error in bulk download for {symbol}: {e}", exc_info=True) self.download_progress[symbol]["status"] = "error" @@ -299,6 +317,7 @@ class BinanceDataCollector: coverage_check = await db_manager.check_data_exists_for_range( symbol, interval, start_date, end_date ) + self.logger.info( f"Data coverage for {symbol} {interval}: " f"{coverage_check['coverage_percent']:.2f}% " @@ -328,9 +347,11 @@ class BinanceDataCollector: # Download each missing range total_new_records = 0 + for idx, time_range in enumerate(missing_ranges, 1): range_start = time_range['start'] range_end = time_range['end'] + self.logger.info( f"Downloading range {idx}/{len(missing_ranges)}: " f"{range_start} to {range_end} for {symbol} {interval}" @@ -339,7 +360,9 @@ class BinanceDataCollector: records_in_range = await self._download_time_range( symbol, interval, range_start, range_end ) + total_new_records += records_in_range + self.logger.info( f"Downloaded {records_in_range} records for range {idx}/{len(missing_ranges)}" ) @@ -395,7 +418,7 @@ class BinanceDataCollector: "q": str(kline_row[7]), "V": None, # taker buy base vol (optional) "Q": None, # taker buy quote vol (optional) - "B": None # ignore + "B": None # ignore } } @@ -413,6 +436,7 @@ class BinanceDataCollector: end_str=end_ms, limit=limit ) + return await asyncio.to_thread(call) async def _download_time_range( @@ -427,9 +451,9 @@ class BinanceDataCollector: # Resolve batch size and retry policy, prefer config then env chunk_size = int(config.get("collection", {}).get("bulk_chunk_size", - int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000")))) + int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000")))) max_retries = int(config.get("collection", {}).get("max_retries", - int(os.getenv("MAX_RETRIES", "3")))) + int(os.getenv("MAX_RETRIES", "3")))) retry_delay = float(config.get("collection", {}).get("retry_delay", 1)) # Normalize time inputs @@ -439,6 +463,7 @@ class BinanceDataCollector: if isinstance(end_date, dt_time): base_date = start_date.date() if isinstance(start_date, datetime) else datetime.now(timezone.utc).date() end_date = datetime.combine(base_date, end_date) + if start_date.tzinfo is None: start_date = start_date.replace(tzinfo=timezone.utc) if end_date.tzinfo is None: @@ -459,6 +484,7 @@ class BinanceDataCollector: chunk_end = end klines: Optional[List[List[Any]]] = None + # Try with retry policy; also handle rate-limit backoffs for attempt in range(max_retries): try: @@ -470,6 +496,7 @@ class BinanceDataCollector: limit=chunk_size ) break + except BinanceAPIException as e: if e.code == -1003: # Rate limit wait_time = retry_delay * (2 ** attempt) @@ -480,6 +507,7 @@ class BinanceDataCollector: if attempt == max_retries - 1: raise await asyncio.sleep(retry_delay) + except Exception as e: self.logger.warning(f"Attempt {attempt + 1}/{max_retries} failed for {symbol} {interval}: {e}") if attempt == max_retries - 1: @@ -489,10 +517,12 @@ class BinanceDataCollector: # No data returned; advance conservatively or terminate if not klines or len(klines) == 0: consecutive_empty += 1 + # If multiple consecutive empty chunks, assume past available history if consecutive_empty >= 2: self.logger.info(f"No more data available for {symbol} {interval}; ending range loop") break + # Otherwise, advance by one chunk and continue current_start = chunk_end + timedelta(milliseconds=1) await asyncio.sleep(0.05) @@ -540,6 +570,7 @@ class BinanceDataCollector: except asyncio.CancelledError: self.logger.info(f"Download for {symbol} {interval} cancelled") break + except Exception as e: self.logger.error(f"Error collecting {interval} data for {symbol}: {e}", exc_info=True) # Backoff before continuing or aborting loop @@ -560,7 +591,8 @@ class BinanceDataCollector: # Check if indicators are enabled for this interval indicator_config = config.get('technical_indicators', {}) calc_intervals = indicator_config.get('calculation_intervals', - ['1m', '5m', '15m', '1h', '4h', '1d']) + ['1m', '5m', '15m', '1h', '4h', '1d']) + if interval not in calc_intervals: self.logger.debug( f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)" @@ -603,12 +635,117 @@ class BinanceDataCollector: except asyncio.CancelledError: self.logger.info(f"Indicator calculation cancelled for {symbol} {interval}") + except Exception as e: self.logger.error( f"Error calculating indicators for {symbol} {interval}: {e}", exc_info=True ) + async def _ensure_indicator_coverage(self, symbol: str, interval: str): + """ + Ensure 100% technical indicator coverage for a symbol/interval. + Checks coverage and backfills if needed. + """ + try: + indicator_config = config.get('technical_indicators', {}) + + # Check if 100% coverage enforcement is enabled + if not indicator_config.get('ensure_100_percent_coverage', True): + return + + # Check current coverage + coverage = await db_manager.check_indicator_coverage(symbol, interval) + + if coverage['coverage_percent'] < 99.9: + self.logger.warning( + f"Indicator coverage for {symbol} {interval}: {coverage['coverage_percent']:.2f}% - backfilling..." + ) + + # Backfill missing indicators + batch_size = indicator_config.get('backfill_batch_size', 200) + backfill_result = await db_manager.backfill_missing_indicators( + symbol, interval, batch_size=batch_size + ) + + if backfill_result['status'] == 'success': + self.logger.info( + f"Indicator backfill complete for {symbol} {interval}: " + f"{backfill_result['coverage_before']:.2f}% → {backfill_result['coverage_after']:.2f}% " + f"({backfill_result['indicators_added']} indicators added)" + ) + else: + self.logger.error( + f"Indicator backfill failed for {symbol} {interval}: {backfill_result.get('error')}" + ) + else: + self.logger.info( + f"Indicator coverage for {symbol} {interval}: {coverage['coverage_percent']:.2f}% ✓" + ) + + except Exception as e: + self.logger.error( + f"Error ensuring indicator coverage for {symbol} {interval}: {e}", + exc_info=True + ) + + async def start_indicator_coverage_monitor(self): + """ + Background task to periodically check and ensure 100% indicator coverage + for all symbol/interval combinations. + """ + global config + + indicator_config = config.get('technical_indicators', {}) + + if not indicator_config.get('ensure_100_percent_coverage', True): + self.logger.info("Indicator coverage monitoring is disabled") + return + + check_interval_hours = indicator_config.get('coverage_check_interval_hours', 6) + self.logger.info( + f"Starting indicator coverage monitor (every {check_interval_hours} hours)" + ) + + while self.is_collecting: + try: + # Get all enabled pairs + enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] + intervals = config.get('collection', {}).get('candle_intervals', + ['1m', '5m', '15m', '1h', '4h', '1d']) + + self.logger.info("Running scheduled indicator coverage check...") + + for pair in enabled_pairs: + symbol = pair['symbol'] + + for interval in intervals: + try: + await self._ensure_indicator_coverage(symbol, interval) + + # Small delay between checks + await asyncio.sleep(1) + + except Exception as e: + self.logger.error( + f"Error checking coverage for {symbol} {interval}: {e}" + ) + + self.logger.info( + f"Indicator coverage check complete. Next check in {check_interval_hours} hours" + ) + + # Wait for next scheduled run + await asyncio.sleep(check_interval_hours * 3600) + + except asyncio.CancelledError: + self.logger.info("Indicator coverage monitor cancelled") + break + + except Exception as e: + self.logger.error(f"Error in indicator coverage monitor: {e}", exc_info=True) + await asyncio.sleep(3600) # Wait 1 hour on error + # --------------------------- # Gap detection and filling # --------------------------- @@ -621,7 +758,6 @@ class BinanceDataCollector: ) -> Dict[str, Any]: """ Automatically fill gaps for a symbol - Args: symbol: Trading pair symbol intervals: List of intervals to fill (default: from config) @@ -635,7 +771,7 @@ class BinanceDataCollector: if intervals is None: intervals = config.get('gap_filling', {}).get('intervals_to_monitor', - ['1m', '5m', '15m', '1h', '4h', '1d']) + ['1m', '5m', '15m', '1h', '4h', '1d']) self.logger.info(f"Starting auto gap fill for {symbol} on intervals: {intervals}") @@ -654,22 +790,24 @@ class BinanceDataCollector: return results record_from_date_iso = pair_config.get('record_from_date') or \ - config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') + config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') _ = datetime.fromisoformat(record_from_date_iso.replace('Z', '+00:00')) # reserved gap_config = config.get('gap_filling', {}) max_gap_size = gap_config.get('max_gap_size_candles', 1000) max_attempts = int(gap_config.get('max_fill_attempts', - int(config.get("collection", {}).get("max_retries", - int(os.getenv("MAX_RETRIES", "3")))))) + int(config.get("collection", {}).get("max_retries", + int(os.getenv("MAX_RETRIES", "3")))))) averaging_lookback = gap_config.get('averaging_lookback_candles', 10) max_empty_seq = gap_config.get('max_consecutive_empty_candles', 5) for interval in intervals: self.logger.info(f"Checking gaps for {symbol} {interval}") + # Detect gaps gaps_info = await db_manager.detect_gaps(symbol, interval) gaps = gaps_info.get('gaps', []) + interval_result = { 'gaps_found': len(gaps), 'gaps_filled': 0, @@ -679,6 +817,7 @@ class BinanceDataCollector: for gap in gaps: missing_candles = gap['missing_candles'] + # Skip if gap is too large if missing_candles > max_gap_size: self.logger.info(f"Skipping large gap: {missing_candles} candles") @@ -688,6 +827,7 @@ class BinanceDataCollector: try: gap_start = datetime.fromisoformat(gap['gap_start']) gap_end = datetime.fromisoformat(gap['gap_end']) + self.logger.info(f"Filling gap: {gap_start} to {gap_end}") # Attempt multiple real fills before resorting to averaging @@ -697,13 +837,17 @@ class BinanceDataCollector: # Small buffer around the gap to ensure edges are covered buffered_start = gap_start - timedelta(milliseconds=1) buffered_end = gap_end + timedelta(milliseconds=1) + added = await self._collect_historical_klines( symbol, interval, buffered_start, buffered_end ) + real_filled_records += added + if added > 0: # A successful fill; break early break + except Exception as e: # Log and continue attempts interval_result['errors'].append( @@ -715,6 +859,7 @@ class BinanceDataCollector: interval_result['gaps_filled'] += 1 results['total_gaps_filled'] += 1 self.logger.info(f"Successfully filled gap with {real_filled_records} records") + else: # Genuine empty gap - fill with averages if enabled if fill_genuine_gaps and gap_config.get('enable_intelligent_averaging', True): @@ -723,6 +868,7 @@ class BinanceDataCollector: max_empty_seq, averaging_lookback ) + interval_result['genuine_filled'] += filled results['total_genuine_filled'] += filled @@ -736,10 +882,11 @@ class BinanceDataCollector: results['intervals'][interval] = interval_result - # Calculate and store indicators after any fills + # Calculate and store indicators after any fills, then ensure coverage if interval_result['gaps_filled'] > 0 or interval_result['genuine_filled'] > 0: try: await self._calculate_and_store_indicators(symbol, interval) + await self._ensure_indicator_coverage(symbol, interval) except Exception as e: self.logger.error(f"Error calculating indicators: {e}", exc_info=True) @@ -754,6 +901,7 @@ class BinanceDataCollector: async def start_auto_gap_fill_scheduler(self): """Start background task for automatic gap filling""" global config + gap_config = config.get('gap_filling', {}) if not gap_config.get('enable_auto_gap_filling', False): self.logger.info("Auto gap filling is disabled") @@ -761,21 +909,25 @@ class BinanceDataCollector: schedule_hours = gap_config.get('auto_fill_schedule_hours', 24) self.logger.info(f"Starting auto gap fill scheduler (every {schedule_hours} hours)") + self.is_collecting = True while self.is_collecting: try: # Get all enabled pairs enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] + for pair in enabled_pairs: symbol = pair['symbol'] self.logger.info(f"Running scheduled gap fill for {symbol}") + try: await self.auto_fill_gaps( symbol, intervals=gap_config.get('intervals_to_monitor'), fill_genuine_gaps=gap_config.get('enable_intelligent_averaging', True) ) + except Exception as e: self.logger.error(f"Error in scheduled gap fill for {symbol}: {e}") @@ -786,6 +938,7 @@ class BinanceDataCollector: except asyncio.CancelledError: self.logger.info("Auto gap fill scheduler cancelled") break + except Exception as e: self.logger.error(f"Error in auto gap fill scheduler: {e}", exc_info=True) await asyncio.sleep(3600) # Wait 1 hour on error @@ -806,6 +959,7 @@ class BinanceDataCollector: # Create WebSocket tasks for each enabled trading pair enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] + if not enabled_pairs: self.logger.warning("No enabled trading pairs found") return @@ -837,13 +991,22 @@ class BinanceDataCollector: name=task_name ) running_tasks[task_name] = task + + # Start indicator coverage monitor + task_name = "indicator_coverage_monitor" + task = asyncio.create_task( + self.start_indicator_coverage_monitor(), + name=task_name + ) + running_tasks[task_name] = task - self.logger.info(f"Started {len(running_tasks)} tasks including gap fill scheduler") + self.logger.info(f"Started {len(running_tasks)} tasks including gap fill scheduler and indicator coverage monitor") async def _websocket_kline_stream(self, symbol: str, interval: str): """WebSocket stream for kline/candlestick data""" stream_name = f"{symbol}@kline_{interval}" uri = f"wss://stream.binance.com:9443/ws/{stream_name}" + reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5) ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20)) ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60)) @@ -861,6 +1024,7 @@ class BinanceDataCollector: async for message in websocket: if not self.websocket_collection_running: break + try: data = json.loads(message) @@ -871,6 +1035,7 @@ class BinanceDataCollector: # Parse kline data ohlcv_data = parse_kline_data(data) + # Store in database await db_manager.insert_ohlcv_single(ohlcv_data) @@ -882,19 +1047,24 @@ class BinanceDataCollector: except json.JSONDecodeError: self.logger.error(f"Invalid JSON from {stream_name}") + except asyncio.CancelledError: self.logger.info(f"Kline stream cancelled: {stream_name}") break + except Exception as e: self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True) except websockets.exceptions.ConnectionClosed as e: self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}") + except asyncio.CancelledError: self.logger.info(f"Kline WebSocket cancelled for {stream_name}") break + except Exception as e: self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True) + finally: # Clean up if stream_name in websocket_connections: @@ -908,6 +1078,7 @@ class BinanceDataCollector: """WebSocket stream for trade/tick data""" stream_name = f"{symbol}@trade" uri = f"wss://stream.binance.com:9443/ws/{stream_name}" + reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5) ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20)) ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60)) @@ -931,8 +1102,10 @@ class BinanceDataCollector: async for message in websocket: if not self.websocket_collection_running: break + try: data = json.loads(message) + if data.get('e') == 'trade': # Parse trade data tick_data = parse_trade_data(data) @@ -945,9 +1118,11 @@ class BinanceDataCollector: except json.JSONDecodeError: self.logger.error(f"Invalid JSON from {stream_name}") + except asyncio.CancelledError: self.logger.info(f"Trade stream cancelled: {stream_name}") break + except Exception as e: self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True) @@ -957,11 +1132,14 @@ class BinanceDataCollector: except websockets.exceptions.ConnectionClosed as e: self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}") + except asyncio.CancelledError: self.logger.info(f"Trade WebSocket cancelled for {stream_name}") break + except Exception as e: self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True) + finally: # Clean up if stream_name in websocket_connections: @@ -985,12 +1163,12 @@ class BinanceDataCollector: for task_name, task in list(running_tasks.items()): if not task.done(): task.cancel() - try: - await task - except asyncio.CancelledError: - self.logger.info(f"Cancelled task: {task_name}") - except Exception as e: - self.logger.error(f"Error cancelling task {task_name}: {e}") + try: + await task + except asyncio.CancelledError: + self.logger.info(f"Cancelled task: {task_name}") + except Exception as e: + self.logger.error(f"Error cancelling task {task_name}: {e}") # Close WebSocket connections for conn_name, conn in list(websocket_connections.items()): @@ -1002,6 +1180,7 @@ class BinanceDataCollector: running_tasks.clear() websocket_connections.clear() + self.logger.info("Continuous data collection stopped") # --------------------------- @@ -1027,6 +1206,7 @@ class BinanceDataCollector: # Get tick data from database ticks = await db_manager.get_tick_data(symbol, start_time, end_time) + if not ticks: self.logger.warning(f"No tick data found for {symbol}") return @@ -1043,11 +1223,13 @@ class BinanceDataCollector: 'low': 'min', 'close': 'last' }) + volume = df['quantity'].resample(interval).sum() trade_count = df.resample(interval).size() # Combine data candles: List[Dict[str, Any]] = [] + for timestamp, row in ohlcv.iterrows(): if pd.notna(row['open']): # Skip empty periods candle = { @@ -1069,8 +1251,11 @@ class BinanceDataCollector: if candles: await db_manager.insert_ohlcv_batch(candles) self.logger.info(f"Generated and stored {len(candles)} candles for {symbol} {interval}") - # Calculate technical indicators + + # Calculate technical indicators and ensure coverage await self._calculate_and_store_indicators(symbol, interval) + await self._ensure_indicator_coverage(symbol, interval) + else: self.logger.warning(f"No candles generated for {symbol} {interval}") @@ -1111,6 +1296,7 @@ def start_ui_server(): # Get UI configuration from environment host = os.getenv("WEB_HOST", "0.0.0.0") port = os.getenv("WEB_PORT", "8000") + logger.info(f"Starting UI server on {host}:{port}") # Start ui.py as a subprocess @@ -1121,6 +1307,7 @@ def start_ui_server(): text=True, bufsize=1 ) + logger.info(f"✓ UI server started with PID: {ui_process.pid}") # Start a thread to log UI output @@ -1161,6 +1348,7 @@ def start_ui_server(): stdout_thread = threading.Thread(target=log_ui_output, daemon=True) stderr_thread = threading.Thread(target=log_ui_stderr, daemon=True) + stdout_thread.start() stderr_thread.start() @@ -1178,6 +1366,7 @@ def stop_ui_server(): try: logger.info("Stopping UI server...") ui_process.terminate() + try: ui_process.wait(timeout=10) logger.info("✓ UI server stopped gracefully") @@ -1186,8 +1375,10 @@ def stop_ui_server(): ui_process.kill() ui_process.wait() logger.info("✓ UI server forcefully stopped") + except Exception as e: logger.error(f"✗ Error stopping UI server: {e}") + finally: ui_process = None else: @@ -1246,17 +1437,23 @@ async def main(): except KeyboardInterrupt: logging.getLogger(__name__).info("Received keyboard interrupt") + except asyncio.CancelledError: logging.getLogger(__name__).info("Application cancelled") + except Exception as e: logging.getLogger(__name__).error(f"Application error: {e}", exc_info=True) + finally: # Clean shutdown logging.getLogger(__name__).info("Initiating shutdown...") + # Stop UI server first stop_ui_server() + # Then cleanup collector await collector.cleanup() + logging.getLogger(__name__).info("Application shutdown complete") @@ -1267,4 +1464,4 @@ if __name__ == "__main__": print("\nShutdown requested by user") except Exception as e: print(f"Fatal error: {e}") - sys.exit(1) + sys.exit(1)