diff --git a/main.py b/main.py index 4b939e2..3fa7ce7 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,11 @@ #!/usr/bin/env python3 + """ main.py - Complete Binance Trading Data Collection System -Main application entry point with async data collection, websocket handling, and task management +Main application entry point with async data collection, websocket handling, bulk +backfill orchestration, periodic gap scans, and task management. + """ import asyncio @@ -13,11 +16,11 @@ import json import subprocess import os from datetime import datetime, timedelta, timezone -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Tuple from contextlib import asynccontextmanager import websockets -import aiohttp +import aiohttp # kept for future-proofing network ops from binance.client import Client from binance.exceptions import BinanceAPIException import pandas as pd @@ -52,11 +55,15 @@ class BinanceDataCollector: self.websocket_collection_running = False self.download_progress: Dict[str, Any] = {} + # Concurrency controls from env with sensible defaults max_downloads = int(os.getenv('MAX_CONCURRENT_DOWNLOADS', '3')) max_gap_fills = int(os.getenv('MAX_CONCURRENT_GAP_FILLS', '2')) self._download_semaphore = asyncio.Semaphore(max_downloads) self._gap_fill_semaphore = asyncio.Semaphore(max_gap_fills) - self.logger.info(f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills") + + self.logger.info( + f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills" + ) async def initialize(self): """Initialize the data collector""" @@ -64,10 +71,32 @@ class BinanceDataCollector: # Setup logging setup_logging() + self.logger.info("Initializing Binance Data Collector") # Load configuration config = load_config() + + # Optional: add defaults for new flags + coll = config.setdefault("collection", {}) + coll.setdefault("default_record_from_date", "2020-01-01T00:00:00Z") + coll.setdefault("initial_full_backfill", True) + coll.setdefault("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"]) + coll.setdefault("bulk_chunk_size", int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000"))) + coll.setdefault("tick_batch_size", int(os.getenv("TICK_BATCH_SIZE", "100"))) + coll.setdefault("max_retries", int(os.getenv("MAX_RETRIES", "3"))) + coll.setdefault("retry_delay", 1) + + gap = config.setdefault("gap_filling", {}) + gap.setdefault("enable_auto_gap_filling", True) + gap.setdefault("auto_fill_schedule_hours", 24) + gap.setdefault("enable_intelligent_averaging", True) + gap.setdefault("max_fill_attempts", int(os.getenv("MAX_RETRIES", "3"))) + gap.setdefault("intervals_to_monitor", coll.get("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"])) + gap.setdefault("max_gap_size_candles", 1000) + gap.setdefault("averaging_lookback_candles", 10) + gap.setdefault("max_consecutive_empty_candles", 5) + self.logger.info(f"Loaded configuration for {len(config['trading_pairs'])} trading pairs") # Initialize database @@ -85,6 +114,124 @@ class BinanceDataCollector: self.client = Client() self.logger.info("Binance client initialized without API credentials (public data only)") + # --------------------------- + # Bulk backfill orchestration + # --------------------------- + + async def start_bulk_download_for_all_pairs(self): + """ + Automatically launch full-history downloads for all enabled pairs, + starting from record_from_date for each pair, across all configured intervals. + """ + global config + + enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] + if not enabled_pairs: + self.logger.warning("No enabled trading pairs found for bulk backfill") + return + + # Sort by priority ascending (1 higher priority), then by symbol for stable order + enabled_pairs.sort(key=lambda p: (int(p.get('priority', 1)), p.get('symbol', ''))) + + # Build tasks per symbol to respect MAX_CONCURRENT_DOWNLOADS at the symbol level + tasks: List[asyncio.Task] = [] + now_utc = datetime.now(timezone.utc) + + for pair in enabled_pairs: + symbol = pair['symbol'].upper() + start_iso = pair.get('record_from_date') or config["collection"]["default_record_from_date"] + try: + start_dt = datetime.fromisoformat(start_iso.replace("Z", "+00:00")) + except Exception: + self.logger.warning(f"Invalid record_from_date for {symbol}: {start_iso}, falling back to default") + start_dt = datetime.fromisoformat(config["collection"]["default_record_from_date"].replace("Z", "+00:00")) + + # One task per symbol to execute all intervals concurrently for that symbol + tasks.append(asyncio.create_task( + self._bulk_download_symbol_all_intervals(symbol, start_dt, now_utc), + name=f"bulk_{symbol}" + )) + + # Execute with graceful progress logging + self.logger.info(f"Launching bulk backfill for {len(tasks)} symbols...") + results = await asyncio.gather(*tasks, return_exceptions=True) + errors = [r for r in results if isinstance(r, Exception)] + if errors: + self.logger.error(f"Bulk backfill completed with {len(errors)} errors; see logs for details") + else: + self.logger.info("Bulk backfill completed successfully for all symbols") + + async def _bulk_download_symbol_all_intervals( + self, + symbol: str, + start_date: datetime, + end_date: Optional[datetime] = None + ): + """ + Launch concurrent downloads of all configured intervals for one symbol, + bounded by the download semaphore to control exchange load. + """ + global config + async with self._download_semaphore: + end_date = end_date or datetime.now(timezone.utc) + + # Ensure progress structure + intervals = config.get("collection", {}).get("candle_intervals", + ["1m", "5m", "15m", "1h", "4h", "1d"]) + self.download_progress[symbol] = { + "status": "running", + "intervals": {iv: {"status": "pending", "records": 0} for iv in intervals}, + "start_time": datetime.now(timezone.utc).isoformat() + } + + # Spawn all intervals concurrently for this symbol + self.logger.info(f"Starting concurrent bulk for {symbol} on {intervals}") + interval_tasks = [ + asyncio.create_task( + self._bulk_download_one_interval(symbol, interval, start_date, end_date), + name=f"bulk_{symbol}_{interval}" + ) + for interval in intervals + ] + + results = await asyncio.gather(*interval_tasks, return_exceptions=True) + + # Mark final status + if any(isinstance(r, Exception) for r in results): + self.download_progress[symbol]["status"] = "error" + self.download_progress[symbol]["error"] = "One or more intervals failed" + else: + self.download_progress[symbol]["status"] = "completed" + self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() + + async def _bulk_download_one_interval( + self, + symbol: str, + interval: str, + start_date: datetime, + end_date: datetime + ): + """Run the bulk downloader for a single symbol+interval and then compute indicators.""" + # Update status + sp = self.download_progress.setdefault(symbol, {"intervals": {}}) + sp["intervals"].setdefault(interval, {"status": "pending", "records": 0}) + sp["intervals"][interval]["status"] = "checking" + + records_count = await self._collect_historical_klines(symbol, interval, start_date, end_date) + if records_count > 0: + sp["intervals"][interval]["status"] = "calculating_indicators" + sp["intervals"][interval]["records"] = records_count + await self._calculate_and_store_indicators(symbol, interval) + sp["intervals"][interval]["status"] = "completed" + self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records") + else: + sp["intervals"][interval]["status"] = "skipped_complete" + self.logger.info(f"Skipped {interval} for {symbol} - data already complete or no new records") + + # --------------------------- + # Intelligent bulk downloader + # --------------------------- + async def bulk_download_historical_data( self, symbol: str, start_date: datetime, end_date: Optional[datetime] = None, intervals: Optional[List[str]] = None @@ -92,6 +239,7 @@ class BinanceDataCollector: """ Bulk download historical OHLCV data from Binance with intelligent gap detection. Only downloads data that doesn't already exist in the database. + Note: kept for API/UI compatibility; orchestration now prefers start_bulk_download_for_all_pairs. """ async with self._download_semaphore: if end_date is None: @@ -107,7 +255,8 @@ class BinanceDataCollector: # Get intervals if intervals is None: - intervals = config.get("collection", {}).get("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"]) + intervals = config.get("collection", {}).get("candle_intervals", + ["1m", "5m", "15m", "1h", "4h", "1d"]) # Initialize progress tracking self.download_progress[symbol] = { @@ -115,38 +264,23 @@ class BinanceDataCollector: "intervals": {}, "start_time": datetime.now(timezone.utc).isoformat() } + for interval in intervals: - self.download_progress[symbol]["intervals"][interval] = { - "status": "pending", - "records": 0 - } + self.download_progress[symbol]["intervals"][interval] = {"status": "pending", "records": 0} try: - for interval in intervals: - self.logger.info(f"Processing {interval} data for {symbol}") - self.download_progress[symbol]["intervals"][interval]["status"] = "checking" - - # Intelligent download - only missing data - records_count = await self._collect_historical_klines( - symbol, interval, start_date, end_date - ) - - if records_count > 0: - self.download_progress[symbol]["intervals"][interval]["status"] = "calculating_indicators" - self.download_progress[symbol]["intervals"][interval]["records"] = records_count - - # Calculate indicators for new data - await self._calculate_and_store_indicators(symbol, interval) - self.download_progress[symbol]["intervals"][interval]["status"] = "completed" - self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records") - else: - self.download_progress[symbol]["intervals"][interval]["status"] = "skipped_complete" - self.logger.info(f"Skipped {interval} for {symbol} - data already complete") + # Run intervals concurrently to improve throughput for one symbol + tasks = [ + asyncio.create_task(self._bulk_download_one_interval(symbol, interval, start_date, end_date), + name=f"bulk_single_{symbol}_{interval}") + for interval in intervals + ] + await asyncio.gather(*tasks) self.download_progress[symbol]["status"] = "completed" self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() except Exception as e: - self.logger.error(f"Error in bulk download for {symbol}: {e}") + self.logger.error(f"Error in bulk download for {symbol}: {e}", exc_info=True) self.download_progress[symbol]["status"] = "error" self.download_progress[symbol]["error"] = str(e) raise @@ -172,7 +306,7 @@ class BinanceDataCollector: ) # If coverage is complete, skip download - if coverage_check['is_complete']: + if coverage_check.get('is_complete'): self.logger.info( f"Skipping {symbol} {interval} - data already complete " f"({coverage_check['coverage_percent']:.2f}% coverage)" @@ -183,6 +317,7 @@ class BinanceDataCollector: missing_ranges = await db_manager.get_missing_time_ranges( symbol, interval, start_date, end_date ) + if not missing_ranges: self.logger.info(f"No missing data ranges for {symbol} {interval}") return 0 @@ -205,7 +340,6 @@ class BinanceDataCollector: symbol, interval, range_start, range_end ) total_new_records += records_in_range - self.logger.info( f"Downloaded {records_in_range} records for range {idx}/{len(missing_ranges)}" ) @@ -249,104 +383,132 @@ class BinanceDataCollector: "T": int(kline_row[6]), "s": symbol.upper(), "i": interval, - "f": None, # first trade id (unknown from REST row) - "L": None, # last trade id (unknown) + "f": None, # first trade id + "L": None, # last trade id "o": str(kline_row[1]), "c": str(kline_row[4]), "h": str(kline_row[2]), "l": str(kline_row[3]), "v": str(kline_row[5]), "n": int(kline_row[8]), - "x": True, # REST klines are for closed candles + "x": True, # closed candle "q": str(kline_row[7]), - "V": None, # taker buy base asset volume (optional) - "Q": None, # taker buy quote asset volume (optional) - "B": None # ignore + "V": None, # taker buy base vol (optional) + "Q": None, # taker buy quote vol (optional) + "B": None # ignore } } + async def _get_historical_klines_async( + self, symbol: str, interval: str, start_ms: int, end_ms: int, limit: int + ) -> List[List[Any]]: + """ + Run python-binance get_historical_klines in a worker thread to avoid blocking the event loop. + """ + def call(): + return self.client.get_historical_klines( + symbol=symbol, + interval=interval, + start_str=start_ms, + end_str=end_ms, + limit=limit + ) + return await asyncio.to_thread(call) + async def _download_time_range( self, symbol: str, interval: str, start_date: datetime, end_date: datetime ) -> int: """ Download data for a specific time range (internal method). - This is the actual download logic extracted from the original collect_historical_klines. Returns: Number of records downloaded and inserted """ global config, db_manager - chunk_size = config.get("collection", {}).get("bulk_chunk_size", 1000) - max_retries = config.get("collection", {}).get("max_retries", 3) - retry_delay = config.get("collection", {}).get("retry_delay", 1) + # Resolve batch size and retry policy, prefer config then env + chunk_size = int(config.get("collection", {}).get("bulk_chunk_size", + int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000")))) + max_retries = int(config.get("collection", {}).get("max_retries", + int(os.getenv("MAX_RETRIES", "3")))) + retry_delay = float(config.get("collection", {}).get("retry_delay", 1)) - # Normalize time inputs that might be naive time objects + # Normalize time inputs from datetime import time as dt_time if isinstance(start_date, dt_time): - # Use today's date in UTC for safety if only a time is provided start_date = datetime.combine(datetime.now(timezone.utc).date(), start_date) if isinstance(end_date, dt_time): - # Use the same date as start_date if possible for consistency base_date = start_date.date() if isinstance(start_date, datetime) else datetime.now(timezone.utc).date() end_date = datetime.combine(base_date, end_date) - if start_date.tzinfo is None: start_date = start_date.replace(tzinfo=timezone.utc) if end_date.tzinfo is None: end_date = end_date.replace(tzinfo=timezone.utc) - # Convert to naive UTC for Binance API - current_start = start_date.replace(tzinfo=None) - end = end_date.replace(tzinfo=None) + # Binance API expects naive UTC timestamps (ms) + current_start = start_date.replace(tzinfo=timezone.utc) + end = end_date.replace(tzinfo=timezone.utc) total_records = 0 - retry_count = 0 + consecutive_empty = 0 while current_start < end: try: # Calculate chunk end time based on interval chunk_end = self._calculate_chunk_end(current_start, interval, chunk_size) - chunk_end = min(chunk_end, end) + if chunk_end > end: + chunk_end = end - # Get klines from Binance with retry logic klines: Optional[List[List[Any]]] = None + # Try with retry policy; also handle rate-limit backoffs for attempt in range(max_retries): try: - klines = self.client.get_historical_klines( + klines = await self._get_historical_klines_async( symbol=symbol, interval=interval, - start_str=int(current_start.timestamp() * 1000), - end_str=int(chunk_end.timestamp() * 1000), + start_ms=int(current_start.timestamp() * 1000), + end_ms=int(chunk_end.timestamp() * 1000), limit=chunk_size ) break except BinanceAPIException as e: if e.code == -1003: # Rate limit wait_time = retry_delay * (2 ** attempt) - self.logger.warning(f"Rate limit hit, waiting {wait_time}s before retry") + self.logger.warning(f"Rate limit hit for {symbol} {interval}, waiting {wait_time}s") await asyncio.sleep(wait_time) else: - raise + self.logger.error(f"Binance API exception for {symbol} {interval}: {e}") + if attempt == max_retries - 1: + raise + await asyncio.sleep(retry_delay) except Exception as e: + self.logger.warning(f"Attempt {attempt + 1}/{max_retries} failed for {symbol} {interval}: {e}") if attempt == max_retries - 1: raise - self.logger.warning(f"Attempt {attempt + 1} failed: {e}") await asyncio.sleep(retry_delay) + # No data returned; advance conservatively or terminate if not klines or len(klines) == 0: - self.logger.info(f"No more data available for {symbol} {interval}") - break + consecutive_empty += 1 + # If multiple consecutive empty chunks, assume past available history + if consecutive_empty >= 2: + self.logger.info(f"No more data available for {symbol} {interval}; ending range loop") + break + # Otherwise, advance by one chunk and continue + current_start = chunk_end + timedelta(milliseconds=1) + await asyncio.sleep(0.05) + continue + + # Reset empty counter on success + consecutive_empty = 0 # Parse and store klines ohlcv_data: List[Dict[str, Any]] = [] for kline in klines: try: - # Normalize to WebSocket-style event expected by parse_kline_data ws_event = self._rest_kline_to_ws_event(symbol, interval, kline) parsed_data = parse_kline_data(ws_event) ohlcv_data.append(parsed_data) except Exception as e: - # Keep original message to aid debugging if structure differs self.logger.error(f"Error parsing kline data: {e} | raw={kline!r}") continue @@ -356,57 +518,63 @@ class BinanceDataCollector: total_records += len(ohlcv_data) # Update progress - if symbol in self.download_progress and interval in self.download_progress[symbol]["intervals"]: + if symbol in self.download_progress and \ + interval in self.download_progress[symbol].get("intervals", {}): self.download_progress[symbol]["intervals"][interval]["records"] = total_records - self.logger.debug(f"Stored {len(ohlcv_data)} {interval} candles for {symbol} (total: {total_records})") + self.logger.debug( + f"Stored {len(ohlcv_data)} {interval} candles for {symbol} (total: {total_records})" + ) - # Update current_start for next chunk + # Update current_start based on last candle close time if klines: - last_close_time_ms = klines[-1][6] # Use the close time of the last kline - current_start = datetime.utcfromtimestamp((last_close_time_ms + 1) / 1000) + last_close_time_ms = int(klines[-1][6]) # close time ms + # Advance by 1 ms past last close to avoid duplicate fetch + current_start = datetime.utcfromtimestamp((last_close_time_ms + 1) / 1000).replace(tzinfo=timezone.utc) else: break - # Delay to respect rate limits - await asyncio.sleep(0.2) - retry_count = 0 # Reset retry count on success - - except BinanceAPIException as e: - retry_count += 1 - self.logger.error(f"Binance API error (attempt {retry_count}): {e}") - if retry_count >= max_retries: - self.logger.error(f"Max retries reached for {symbol} {interval}") - raise - # Exponential backoff - wait_time = retry_delay * (2 ** retry_count) - await asyncio.sleep(wait_time) + # Light delay to avoid hammering + await asyncio.sleep(0.05) except asyncio.CancelledError: self.logger.info(f"Download for {symbol} {interval} cancelled") break - except Exception as e: self.logger.error(f"Error collecting {interval} data for {symbol}: {e}", exc_info=True) - raise + # Backoff before continuing or aborting loop + await asyncio.sleep(max(0.5, retry_delay)) + # Decide: keep attempting next loop iteration to avoid a hard stop + # If persistent errors, the retries around REST will surface again + # Optional: could break here if desired, but continuing is safer for coverage return total_records + # --------------------------- + # Technical indicators + # --------------------------- + async def _calculate_and_store_indicators(self, symbol: str, interval: str): """Calculate and store technical indicators for a symbol and interval""" try: # Check if indicators are enabled for this interval indicator_config = config.get('technical_indicators', {}) - calc_intervals = indicator_config.get('calculation_intervals', ['1m', '5m', '15m', '1h', '4h', '1d']) + calc_intervals = indicator_config.get('calculation_intervals', + ['1m', '5m', '15m', '1h', '4h', '1d']) if interval not in calc_intervals: - self.logger.debug(f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)") + self.logger.debug( + f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)" + ) return # Get OHLCV data from database (need enough for longest indicator period) - max_period = 200 # Maximum period for indicators like SMA-200 + max_period = 200 # SMA-200, etc. ohlcv_data = await db_manager.get_ohlcv_data(symbol, interval, limit=max_period + 50) - if len(ohlcv_data) < 50: # Need minimum data for indicators - self.logger.warning(f"Not enough data for indicators: {symbol} {interval} ({len(ohlcv_data)} records)") + + if len(ohlcv_data) < 50: + self.logger.warning( + f"Not enough data for indicators: {symbol} {interval} ({len(ohlcv_data)} records)" + ) return # Convert to DataFrame @@ -429,11 +597,21 @@ class BinanceDataCollector: # Store indicators in database if indicators_data: await db_manager.insert_indicators_batch(symbol, interval, indicators_data) - self.logger.info(f"Stored {len(indicators_data)} indicator values for {symbol} {interval}") + self.logger.info( + f"Stored {len(indicators_data)} indicator values for {symbol} {interval}" + ) + except asyncio.CancelledError: self.logger.info(f"Indicator calculation cancelled for {symbol} {interval}") except Exception as e: - self.logger.error(f"Error calculating indicators for {symbol} {interval}: {e}", exc_info=True) + self.logger.error( + f"Error calculating indicators for {symbol} {interval}: {e}", + exc_info=True + ) + + # --------------------------- + # Gap detection and filling + # --------------------------- async def auto_fill_gaps( self, @@ -443,6 +621,7 @@ class BinanceDataCollector: ) -> Dict[str, Any]: """ Automatically fill gaps for a symbol + Args: symbol: Trading pair symbol intervals: List of intervals to fill (default: from config) @@ -455,9 +634,11 @@ class BinanceDataCollector: global config, db_manager if intervals is None: - intervals = config.get('gap_filling', {}).get('intervals_to_monitor', ['1m', '5m', '15m', '1h', '4h', '1d']) + intervals = config.get('gap_filling', {}).get('intervals_to_monitor', + ['1m', '5m', '15m', '1h', '4h', '1d']) self.logger.info(f"Starting auto gap fill for {symbol} on intervals: {intervals}") + results: Dict[str, Any] = { 'symbol': symbol, 'intervals': {}, @@ -472,20 +653,23 @@ class BinanceDataCollector: self.logger.warning(f"Symbol {symbol} not found in config") return results - record_from_date = pair_config.get('record_from_date') - if not record_from_date: - record_from_date = config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') - _ = datetime.fromisoformat(record_from_date.replace('Z', '+00:00')) # kept for future use + record_from_date_iso = pair_config.get('record_from_date') or \ + config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') + _ = datetime.fromisoformat(record_from_date_iso.replace('Z', '+00:00')) # reserved gap_config = config.get('gap_filling', {}) max_gap_size = gap_config.get('max_gap_size_candles', 1000) + max_attempts = int(gap_config.get('max_fill_attempts', + int(config.get("collection", {}).get("max_retries", + int(os.getenv("MAX_RETRIES", "3")))))) + averaging_lookback = gap_config.get('averaging_lookback_candles', 10) + max_empty_seq = gap_config.get('max_consecutive_empty_candles', 5) for interval in intervals: self.logger.info(f"Checking gaps for {symbol} {interval}") # Detect gaps gaps_info = await db_manager.detect_gaps(symbol, interval) gaps = gaps_info.get('gaps', []) - interval_result = { 'gaps_found': len(gaps), 'gaps_filled': 0, @@ -493,7 +677,6 @@ class BinanceDataCollector: 'errors': [] } - # Fill downloadable gaps for gap in gaps: missing_candles = gap['missing_candles'] # Skip if gap is too large @@ -503,32 +686,48 @@ class BinanceDataCollector: continue try: - # Download missing data gap_start = datetime.fromisoformat(gap['gap_start']) gap_end = datetime.fromisoformat(gap['gap_end']) self.logger.info(f"Filling gap: {gap_start} to {gap_end}") - records_count = await self._collect_historical_klines( - symbol, interval, gap_start, gap_end - ) + # Attempt multiple real fills before resorting to averaging + real_filled_records = 0 + for attempt in range(1, max_attempts + 1): + try: + # Small buffer around the gap to ensure edges are covered + buffered_start = gap_start - timedelta(milliseconds=1) + buffered_end = gap_end + timedelta(milliseconds=1) + added = await self._collect_historical_klines( + symbol, interval, buffered_start, buffered_end + ) + real_filled_records += added + if added > 0: + # A successful fill; break early + break + except Exception as e: + # Log and continue attempts + interval_result['errors'].append( + f"Attempt {attempt} failed for gap {gap_start}->{gap_end}: {e}" + ) + await asyncio.sleep(0.5 * attempt) - if records_count > 0: + if real_filled_records > 0: interval_result['gaps_filled'] += 1 results['total_gaps_filled'] += 1 - self.logger.info(f"Successfully filled gap with {records_count} records") + self.logger.info(f"Successfully filled gap with {real_filled_records} records") else: # Genuine empty gap - fill with averages if enabled - if fill_genuine_gaps: + if fill_genuine_gaps and gap_config.get('enable_intelligent_averaging', True): filled = await db_manager.fill_genuine_gaps_with_averages( symbol, interval, - gap_config.get('max_consecutive_empty_candles', 5), - gap_config.get('averaging_lookback_candles', 10) + max_empty_seq, + averaging_lookback ) interval_result['genuine_filled'] += filled results['total_genuine_filled'] += filled # Small delay between gaps - await asyncio.sleep(0.5) + await asyncio.sleep(0.2) except Exception as e: error_msg = f"Error filling gap: {str(e)}" @@ -537,7 +736,7 @@ class BinanceDataCollector: results['intervals'][interval] = interval_result - # Calculate and store indicators after filling gaps + # Calculate and store indicators after any fills if interval_result['gaps_filled'] > 0 or interval_result['genuine_filled'] > 0: try: await self._calculate_and_store_indicators(symbol, interval) @@ -554,7 +753,7 @@ class BinanceDataCollector: async def start_auto_gap_fill_scheduler(self): """Start background task for automatic gap filling""" - global config, db_manager + global config gap_config = config.get('gap_filling', {}) if not gap_config.get('enable_auto_gap_filling', False): self.logger.info("Auto gap filling is disabled") @@ -562,6 +761,7 @@ class BinanceDataCollector: schedule_hours = gap_config.get('auto_fill_schedule_hours', 24) self.logger.info(f"Starting auto gap fill scheduler (every {schedule_hours} hours)") + self.is_collecting = True while self.is_collecting: try: @@ -590,6 +790,10 @@ class BinanceDataCollector: self.logger.error(f"Error in auto gap fill scheduler: {e}", exc_info=True) await asyncio.sleep(3600) # Wait 1 hour on error + # --------------------------- + # WebSocket continuous streams + # --------------------------- + async def start_continuous_collection(self): """Start continuous data collection via WebSocket""" if self.websocket_collection_running: @@ -667,7 +871,6 @@ class BinanceDataCollector: # Parse kline data ohlcv_data = parse_kline_data(data) - # Store in database await db_manager.insert_ohlcv_single(ohlcv_data) @@ -697,9 +900,9 @@ class BinanceDataCollector: if stream_name in websocket_connections: websocket_connections.pop(stream_name, None) - if self.websocket_collection_running: - self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...") - await asyncio.sleep(reconnect_delay) + if self.websocket_collection_running: + self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...") + await asyncio.sleep(reconnect_delay) async def _websocket_trade_stream(self, symbol: str): """WebSocket stream for trade/tick data""" @@ -720,7 +923,10 @@ class BinanceDataCollector: websocket_connections[stream_name] = websocket tick_batch: List[Dict[str, Any]] = [] - batch_size = config.get('collection', {}).get('tick_batch_size', 100) + batch_size = int(config.get('collection', {}).get( + 'tick_batch_size', + int(os.getenv("TICK_BATCH_SIZE", "100")) + )) async for message in websocket: if not self.websocket_collection_running: @@ -761,9 +967,9 @@ class BinanceDataCollector: if stream_name in websocket_connections: websocket_connections.pop(stream_name, None) - if self.websocket_collection_running: - self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...") - await asyncio.sleep(reconnect_delay) + if self.websocket_collection_running: + self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...") + await asyncio.sleep(reconnect_delay) async def stop_continuous_collection(self): """Stop continuous data collection""" @@ -798,6 +1004,10 @@ class BinanceDataCollector: websocket_connections.clear() self.logger.info("Continuous data collection stopped") + # --------------------------- + # Candle generation from ticks + # --------------------------- + async def generate_candles_from_ticks( self, symbol: str, @@ -864,6 +1074,10 @@ class BinanceDataCollector: else: self.logger.warning(f"No candles generated for {symbol} {interval}") + # --------------------------- + # Progress and cleanup + # --------------------------- + async def get_download_progress(self, symbol: str = None) -> Dict[str, Any]: """Get download progress for a symbol or all symbols""" if symbol: @@ -873,15 +1087,21 @@ class BinanceDataCollector: async def cleanup(self): """Clean up resources""" await self.stop_continuous_collection() + # db_manager may have a close method; guard if absent try: if db_manager and hasattr(db_manager, "close"): await db_manager.close() except Exception as e: self.logger.warning(f"Error closing database manager: {e}") + self.logger.info("BinanceDataCollector cleanup complete") +# --------------------------- +# UI process management +# --------------------------- + def start_ui_server(): """Start the UI server as a subprocess""" global ui_process @@ -916,7 +1136,7 @@ def start_ui_server(): elif ' - DEBUG - ' in line or 'DEBUG:' in line: return logging.DEBUG else: - # Default to INFO for all other lines (including INFO: and standard messages) + # Default to INFO for all other lines return logging.INFO def log_ui_output(): @@ -925,7 +1145,7 @@ def start_ui_server(): return for line in ui_process.stdout: line = line.rstrip() - if line: # Only log non-empty lines + if line: log_level = parse_log_level(line) logger.log(log_level, f"[UI] {line}") @@ -935,7 +1155,7 @@ def start_ui_server(): return for line in ui_process.stderr: line = line.rstrip() - if line: # Only log non-empty lines + if line: log_level = parse_log_level(line) logger.log(log_level, f"[UI] {line}") @@ -974,7 +1194,10 @@ def stop_ui_server(): logger.debug("UI server process not running") +# --------------------------- # Global signal handlers +# --------------------------- + def signal_handler(signum, frame): """Handle shutdown signals""" logger = logging.getLogger(__name__) @@ -988,6 +1211,10 @@ def signal_handler(signum, frame): task.cancel() +# --------------------------- +# Main entry point +# --------------------------- + async def main(): """Main application entry point""" # Setup signal handlers @@ -1003,7 +1230,14 @@ async def main(): # Start UI server start_ui_server() - # Start continuous collection + # Optionally kick off an initial full backfill for all configured pairs + if config.get("collection", {}).get("initial_full_backfill", True): + # Launch as a background task so WebSockets can start immediately + task_name = "initial_full_backfill" + backfill_task = asyncio.create_task(collector.start_bulk_download_for_all_pairs(), name=task_name) + running_tasks[task_name] = backfill_task + + # Start continuous collection (kline + trade streams) and gap scheduler await collector.start_continuous_collection() # Keep the application running @@ -1033,4 +1267,4 @@ if __name__ == "__main__": print("\nShutdown requested by user") except Exception as e: print(f"Fatal error: {e}") - sys.exit(1) + sys.exit(1)