feat: Integrate 100% indicator coverage enforcement into data collection

- Add _ensure_indicator_coverage() to verify and backfill after data collection
- Add start_indicator_coverage_monitor() background task for periodic checks
- Configure coverage monitoring with ensure_100_percent_coverage flag
- Set coverage_check_interval_hours (default: 6 hours) for monitoring frequency
- Set backfill_batch_size (default: 200) for efficient backfilling
- Call coverage check after bulk downloads, gap fills, and candle generation
- Start indicator_coverage_monitor task in continuous collection mode
- Log coverage percentages and backfill results for transparency
- Ensure all OHLCV records have complete technical indicator coverage

This integrates the new db.py indicator coverage methods into the main
data collection workflow, ensuring 100% coverage is automatically
maintained across all symbol/interval combinations.
This commit is contained in:
2025-10-09 08:51:09 +01:00
parent 2708bcb176
commit e891cd7c15

253
main.py
View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
main.py - Complete Binance Trading Data Collection System main.py - Complete Binance Trading Data Collection System
Main application entry point with async data collection, websocket handling, bulk Main application entry point with async data collection, websocket handling, bulk
@@ -18,9 +19,8 @@ import os
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional, Any, Tuple from typing import Dict, List, Optional, Any, Tuple
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import websockets import websockets
import aiohttp # kept for future-proofing network ops import aiohttp # kept for future-proofing network ops
from binance.client import Client from binance.client import Client
from binance.exceptions import BinanceAPIException from binance.exceptions import BinanceAPIException
import pandas as pd import pandas as pd
@@ -34,6 +34,7 @@ from utils import (
calculate_technical_indicators, validate_symbol, format_timestamp calculate_technical_indicators, validate_symbol, format_timestamp
) )
# Load environment variables # Load environment variables
load_dotenv('variables.env') load_dotenv('variables.env')
@@ -60,7 +61,6 @@ class BinanceDataCollector:
max_gap_fills = int(os.getenv('MAX_CONCURRENT_GAP_FILLS', '2')) max_gap_fills = int(os.getenv('MAX_CONCURRENT_GAP_FILLS', '2'))
self._download_semaphore = asyncio.Semaphore(max_downloads) self._download_semaphore = asyncio.Semaphore(max_downloads)
self._gap_fill_semaphore = asyncio.Semaphore(max_gap_fills) self._gap_fill_semaphore = asyncio.Semaphore(max_gap_fills)
self.logger.info( self.logger.info(
f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills" f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills"
) )
@@ -71,7 +71,6 @@ class BinanceDataCollector:
# Setup logging # Setup logging
setup_logging() setup_logging()
self.logger.info("Initializing Binance Data Collector") self.logger.info("Initializing Binance Data Collector")
# Load configuration # Load configuration
@@ -97,6 +96,12 @@ class BinanceDataCollector:
gap.setdefault("averaging_lookback_candles", 10) gap.setdefault("averaging_lookback_candles", 10)
gap.setdefault("max_consecutive_empty_candles", 5) gap.setdefault("max_consecutive_empty_candles", 5)
# Add indicator coverage configuration
ind = config.setdefault("technical_indicators", {})
ind.setdefault("ensure_100_percent_coverage", True)
ind.setdefault("coverage_check_interval_hours", 6)
ind.setdefault("backfill_batch_size", 200)
self.logger.info(f"Loaded configuration for {len(config['trading_pairs'])} trading pairs") self.logger.info(f"Loaded configuration for {len(config['trading_pairs'])} trading pairs")
# Initialize database # Initialize database
@@ -107,6 +112,7 @@ class BinanceDataCollector:
# Initialize Binance client (no API key needed for market data) # Initialize Binance client (no API key needed for market data)
api_key = os.getenv('BINANCE_API_KEY') api_key = os.getenv('BINANCE_API_KEY')
secret_key = os.getenv('BINANCE_SECRET_KEY') secret_key = os.getenv('BINANCE_SECRET_KEY')
if api_key and secret_key: if api_key and secret_key:
self.client = Client(api_key, secret_key) self.client = Client(api_key, secret_key)
self.logger.info("Binance client initialized with API credentials") self.logger.info("Binance client initialized with API credentials")
@@ -124,8 +130,8 @@ class BinanceDataCollector:
starting from record_from_date for each pair, across all configured intervals. starting from record_from_date for each pair, across all configured intervals.
""" """
global config global config
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
if not enabled_pairs: if not enabled_pairs:
self.logger.warning("No enabled trading pairs found for bulk backfill") self.logger.warning("No enabled trading pairs found for bulk backfill")
return return
@@ -140,6 +146,7 @@ class BinanceDataCollector:
for pair in enabled_pairs: for pair in enabled_pairs:
symbol = pair['symbol'].upper() symbol = pair['symbol'].upper()
start_iso = pair.get('record_from_date') or config["collection"]["default_record_from_date"] start_iso = pair.get('record_from_date') or config["collection"]["default_record_from_date"]
try: try:
start_dt = datetime.fromisoformat(start_iso.replace("Z", "+00:00")) start_dt = datetime.fromisoformat(start_iso.replace("Z", "+00:00"))
except Exception: except Exception:
@@ -155,6 +162,7 @@ class BinanceDataCollector:
# Execute with graceful progress logging # Execute with graceful progress logging
self.logger.info(f"Launching bulk backfill for {len(tasks)} symbols...") self.logger.info(f"Launching bulk backfill for {len(tasks)} symbols...")
results = await asyncio.gather(*tasks, return_exceptions=True) results = await asyncio.gather(*tasks, return_exceptions=True)
errors = [r for r in results if isinstance(r, Exception)] errors = [r for r in results if isinstance(r, Exception)]
if errors: if errors:
self.logger.error(f"Bulk backfill completed with {len(errors)} errors; see logs for details") self.logger.error(f"Bulk backfill completed with {len(errors)} errors; see logs for details")
@@ -172,6 +180,7 @@ class BinanceDataCollector:
bounded by the download semaphore to control exchange load. bounded by the download semaphore to control exchange load.
""" """
global config global config
async with self._download_semaphore: async with self._download_semaphore:
end_date = end_date or datetime.now(timezone.utc) end_date = end_date or datetime.now(timezone.utc)
@@ -186,6 +195,7 @@ class BinanceDataCollector:
# Spawn all intervals concurrently for this symbol # Spawn all intervals concurrently for this symbol
self.logger.info(f"Starting concurrent bulk for {symbol} on {intervals}") self.logger.info(f"Starting concurrent bulk for {symbol} on {intervals}")
interval_tasks = [ interval_tasks = [
asyncio.create_task( asyncio.create_task(
self._bulk_download_one_interval(symbol, interval, start_date, end_date), self._bulk_download_one_interval(symbol, interval, start_date, end_date),
@@ -202,7 +212,7 @@ class BinanceDataCollector:
self.download_progress[symbol]["error"] = "One or more intervals failed" self.download_progress[symbol]["error"] = "One or more intervals failed"
else: else:
self.download_progress[symbol]["status"] = "completed" self.download_progress[symbol]["status"] = "completed"
self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat()
async def _bulk_download_one_interval( async def _bulk_download_one_interval(
self, self,
@@ -218,10 +228,17 @@ class BinanceDataCollector:
sp["intervals"][interval]["status"] = "checking" sp["intervals"][interval]["status"] = "checking"
records_count = await self._collect_historical_klines(symbol, interval, start_date, end_date) records_count = await self._collect_historical_klines(symbol, interval, start_date, end_date)
if records_count > 0: if records_count > 0:
sp["intervals"][interval]["status"] = "calculating_indicators" sp["intervals"][interval]["status"] = "calculating_indicators"
sp["intervals"][interval]["records"] = records_count sp["intervals"][interval]["records"] = records_count
# Calculate indicators
await self._calculate_and_store_indicators(symbol, interval) await self._calculate_and_store_indicators(symbol, interval)
# Ensure 100% indicator coverage
await self._ensure_indicator_coverage(symbol, interval)
sp["intervals"][interval]["status"] = "completed" sp["intervals"][interval]["status"] = "completed"
self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records") self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records")
else: else:
@@ -256,7 +273,7 @@ class BinanceDataCollector:
# Get intervals # Get intervals
if intervals is None: if intervals is None:
intervals = config.get("collection", {}).get("candle_intervals", intervals = config.get("collection", {}).get("candle_intervals",
["1m", "5m", "15m", "1h", "4h", "1d"]) ["1m", "5m", "15m", "1h", "4h", "1d"])
# Initialize progress tracking # Initialize progress tracking
self.download_progress[symbol] = { self.download_progress[symbol] = {
@@ -272,13 +289,14 @@ class BinanceDataCollector:
# Run intervals concurrently to improve throughput for one symbol # Run intervals concurrently to improve throughput for one symbol
tasks = [ tasks = [
asyncio.create_task(self._bulk_download_one_interval(symbol, interval, start_date, end_date), asyncio.create_task(self._bulk_download_one_interval(symbol, interval, start_date, end_date),
name=f"bulk_single_{symbol}_{interval}") name=f"bulk_single_{symbol}_{interval}")
for interval in intervals for interval in intervals
] ]
await asyncio.gather(*tasks)
await asyncio.gather(*tasks)
self.download_progress[symbol]["status"] = "completed" self.download_progress[symbol]["status"] = "completed"
self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat() self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat()
except Exception as e: except Exception as e:
self.logger.error(f"Error in bulk download for {symbol}: {e}", exc_info=True) self.logger.error(f"Error in bulk download for {symbol}: {e}", exc_info=True)
self.download_progress[symbol]["status"] = "error" self.download_progress[symbol]["status"] = "error"
@@ -299,6 +317,7 @@ class BinanceDataCollector:
coverage_check = await db_manager.check_data_exists_for_range( coverage_check = await db_manager.check_data_exists_for_range(
symbol, interval, start_date, end_date symbol, interval, start_date, end_date
) )
self.logger.info( self.logger.info(
f"Data coverage for {symbol} {interval}: " f"Data coverage for {symbol} {interval}: "
f"{coverage_check['coverage_percent']:.2f}% " f"{coverage_check['coverage_percent']:.2f}% "
@@ -328,9 +347,11 @@ class BinanceDataCollector:
# Download each missing range # Download each missing range
total_new_records = 0 total_new_records = 0
for idx, time_range in enumerate(missing_ranges, 1): for idx, time_range in enumerate(missing_ranges, 1):
range_start = time_range['start'] range_start = time_range['start']
range_end = time_range['end'] range_end = time_range['end']
self.logger.info( self.logger.info(
f"Downloading range {idx}/{len(missing_ranges)}: " f"Downloading range {idx}/{len(missing_ranges)}: "
f"{range_start} to {range_end} for {symbol} {interval}" f"{range_start} to {range_end} for {symbol} {interval}"
@@ -339,7 +360,9 @@ class BinanceDataCollector:
records_in_range = await self._download_time_range( records_in_range = await self._download_time_range(
symbol, interval, range_start, range_end symbol, interval, range_start, range_end
) )
total_new_records += records_in_range total_new_records += records_in_range
self.logger.info( self.logger.info(
f"Downloaded {records_in_range} records for range {idx}/{len(missing_ranges)}" f"Downloaded {records_in_range} records for range {idx}/{len(missing_ranges)}"
) )
@@ -395,7 +418,7 @@ class BinanceDataCollector:
"q": str(kline_row[7]), "q": str(kline_row[7]),
"V": None, # taker buy base vol (optional) "V": None, # taker buy base vol (optional)
"Q": None, # taker buy quote vol (optional) "Q": None, # taker buy quote vol (optional)
"B": None # ignore "B": None # ignore
} }
} }
@@ -413,6 +436,7 @@ class BinanceDataCollector:
end_str=end_ms, end_str=end_ms,
limit=limit limit=limit
) )
return await asyncio.to_thread(call) return await asyncio.to_thread(call)
async def _download_time_range( async def _download_time_range(
@@ -427,9 +451,9 @@ class BinanceDataCollector:
# Resolve batch size and retry policy, prefer config then env # Resolve batch size and retry policy, prefer config then env
chunk_size = int(config.get("collection", {}).get("bulk_chunk_size", chunk_size = int(config.get("collection", {}).get("bulk_chunk_size",
int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000")))) int(os.getenv("BULK_DOWNLOAD_BATCH_SIZE", "1000"))))
max_retries = int(config.get("collection", {}).get("max_retries", max_retries = int(config.get("collection", {}).get("max_retries",
int(os.getenv("MAX_RETRIES", "3")))) int(os.getenv("MAX_RETRIES", "3"))))
retry_delay = float(config.get("collection", {}).get("retry_delay", 1)) retry_delay = float(config.get("collection", {}).get("retry_delay", 1))
# Normalize time inputs # Normalize time inputs
@@ -439,6 +463,7 @@ class BinanceDataCollector:
if isinstance(end_date, dt_time): if isinstance(end_date, dt_time):
base_date = start_date.date() if isinstance(start_date, datetime) else datetime.now(timezone.utc).date() base_date = start_date.date() if isinstance(start_date, datetime) else datetime.now(timezone.utc).date()
end_date = datetime.combine(base_date, end_date) end_date = datetime.combine(base_date, end_date)
if start_date.tzinfo is None: if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc) start_date = start_date.replace(tzinfo=timezone.utc)
if end_date.tzinfo is None: if end_date.tzinfo is None:
@@ -459,6 +484,7 @@ class BinanceDataCollector:
chunk_end = end chunk_end = end
klines: Optional[List[List[Any]]] = None klines: Optional[List[List[Any]]] = None
# Try with retry policy; also handle rate-limit backoffs # Try with retry policy; also handle rate-limit backoffs
for attempt in range(max_retries): for attempt in range(max_retries):
try: try:
@@ -470,6 +496,7 @@ class BinanceDataCollector:
limit=chunk_size limit=chunk_size
) )
break break
except BinanceAPIException as e: except BinanceAPIException as e:
if e.code == -1003: # Rate limit if e.code == -1003: # Rate limit
wait_time = retry_delay * (2 ** attempt) wait_time = retry_delay * (2 ** attempt)
@@ -480,6 +507,7 @@ class BinanceDataCollector:
if attempt == max_retries - 1: if attempt == max_retries - 1:
raise raise
await asyncio.sleep(retry_delay) await asyncio.sleep(retry_delay)
except Exception as e: except Exception as e:
self.logger.warning(f"Attempt {attempt + 1}/{max_retries} failed for {symbol} {interval}: {e}") self.logger.warning(f"Attempt {attempt + 1}/{max_retries} failed for {symbol} {interval}: {e}")
if attempt == max_retries - 1: if attempt == max_retries - 1:
@@ -489,10 +517,12 @@ class BinanceDataCollector:
# No data returned; advance conservatively or terminate # No data returned; advance conservatively or terminate
if not klines or len(klines) == 0: if not klines or len(klines) == 0:
consecutive_empty += 1 consecutive_empty += 1
# If multiple consecutive empty chunks, assume past available history # If multiple consecutive empty chunks, assume past available history
if consecutive_empty >= 2: if consecutive_empty >= 2:
self.logger.info(f"No more data available for {symbol} {interval}; ending range loop") self.logger.info(f"No more data available for {symbol} {interval}; ending range loop")
break break
# Otherwise, advance by one chunk and continue # Otherwise, advance by one chunk and continue
current_start = chunk_end + timedelta(milliseconds=1) current_start = chunk_end + timedelta(milliseconds=1)
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
@@ -540,6 +570,7 @@ class BinanceDataCollector:
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info(f"Download for {symbol} {interval} cancelled") self.logger.info(f"Download for {symbol} {interval} cancelled")
break break
except Exception as e: except Exception as e:
self.logger.error(f"Error collecting {interval} data for {symbol}: {e}", exc_info=True) self.logger.error(f"Error collecting {interval} data for {symbol}: {e}", exc_info=True)
# Backoff before continuing or aborting loop # Backoff before continuing or aborting loop
@@ -560,7 +591,8 @@ class BinanceDataCollector:
# Check if indicators are enabled for this interval # Check if indicators are enabled for this interval
indicator_config = config.get('technical_indicators', {}) indicator_config = config.get('technical_indicators', {})
calc_intervals = indicator_config.get('calculation_intervals', calc_intervals = indicator_config.get('calculation_intervals',
['1m', '5m', '15m', '1h', '4h', '1d']) ['1m', '5m', '15m', '1h', '4h', '1d'])
if interval not in calc_intervals: if interval not in calc_intervals:
self.logger.debug( self.logger.debug(
f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)" f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)"
@@ -603,12 +635,117 @@ class BinanceDataCollector:
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info(f"Indicator calculation cancelled for {symbol} {interval}") self.logger.info(f"Indicator calculation cancelled for {symbol} {interval}")
except Exception as e: except Exception as e:
self.logger.error( self.logger.error(
f"Error calculating indicators for {symbol} {interval}: {e}", f"Error calculating indicators for {symbol} {interval}: {e}",
exc_info=True exc_info=True
) )
async def _ensure_indicator_coverage(self, symbol: str, interval: str):
"""
Ensure 100% technical indicator coverage for a symbol/interval.
Checks coverage and backfills if needed.
"""
try:
indicator_config = config.get('technical_indicators', {})
# Check if 100% coverage enforcement is enabled
if not indicator_config.get('ensure_100_percent_coverage', True):
return
# Check current coverage
coverage = await db_manager.check_indicator_coverage(symbol, interval)
if coverage['coverage_percent'] < 99.9:
self.logger.warning(
f"Indicator coverage for {symbol} {interval}: {coverage['coverage_percent']:.2f}% - backfilling..."
)
# Backfill missing indicators
batch_size = indicator_config.get('backfill_batch_size', 200)
backfill_result = await db_manager.backfill_missing_indicators(
symbol, interval, batch_size=batch_size
)
if backfill_result['status'] == 'success':
self.logger.info(
f"Indicator backfill complete for {symbol} {interval}: "
f"{backfill_result['coverage_before']:.2f}% → {backfill_result['coverage_after']:.2f}% "
f"({backfill_result['indicators_added']} indicators added)"
)
else:
self.logger.error(
f"Indicator backfill failed for {symbol} {interval}: {backfill_result.get('error')}"
)
else:
self.logger.info(
f"Indicator coverage for {symbol} {interval}: {coverage['coverage_percent']:.2f}% ✓"
)
except Exception as e:
self.logger.error(
f"Error ensuring indicator coverage for {symbol} {interval}: {e}",
exc_info=True
)
async def start_indicator_coverage_monitor(self):
"""
Background task to periodically check and ensure 100% indicator coverage
for all symbol/interval combinations.
"""
global config
indicator_config = config.get('technical_indicators', {})
if not indicator_config.get('ensure_100_percent_coverage', True):
self.logger.info("Indicator coverage monitoring is disabled")
return
check_interval_hours = indicator_config.get('coverage_check_interval_hours', 6)
self.logger.info(
f"Starting indicator coverage monitor (every {check_interval_hours} hours)"
)
while self.is_collecting:
try:
# Get all enabled pairs
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
intervals = config.get('collection', {}).get('candle_intervals',
['1m', '5m', '15m', '1h', '4h', '1d'])
self.logger.info("Running scheduled indicator coverage check...")
for pair in enabled_pairs:
symbol = pair['symbol']
for interval in intervals:
try:
await self._ensure_indicator_coverage(symbol, interval)
# Small delay between checks
await asyncio.sleep(1)
except Exception as e:
self.logger.error(
f"Error checking coverage for {symbol} {interval}: {e}"
)
self.logger.info(
f"Indicator coverage check complete. Next check in {check_interval_hours} hours"
)
# Wait for next scheduled run
await asyncio.sleep(check_interval_hours * 3600)
except asyncio.CancelledError:
self.logger.info("Indicator coverage monitor cancelled")
break
except Exception as e:
self.logger.error(f"Error in indicator coverage monitor: {e}", exc_info=True)
await asyncio.sleep(3600) # Wait 1 hour on error
# --------------------------- # ---------------------------
# Gap detection and filling # Gap detection and filling
# --------------------------- # ---------------------------
@@ -621,7 +758,6 @@ class BinanceDataCollector:
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Automatically fill gaps for a symbol Automatically fill gaps for a symbol
Args: Args:
symbol: Trading pair symbol symbol: Trading pair symbol
intervals: List of intervals to fill (default: from config) intervals: List of intervals to fill (default: from config)
@@ -635,7 +771,7 @@ class BinanceDataCollector:
if intervals is None: if intervals is None:
intervals = config.get('gap_filling', {}).get('intervals_to_monitor', intervals = config.get('gap_filling', {}).get('intervals_to_monitor',
['1m', '5m', '15m', '1h', '4h', '1d']) ['1m', '5m', '15m', '1h', '4h', '1d'])
self.logger.info(f"Starting auto gap fill for {symbol} on intervals: {intervals}") self.logger.info(f"Starting auto gap fill for {symbol} on intervals: {intervals}")
@@ -654,22 +790,24 @@ class BinanceDataCollector:
return results return results
record_from_date_iso = pair_config.get('record_from_date') or \ record_from_date_iso = pair_config.get('record_from_date') or \
config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z')
_ = datetime.fromisoformat(record_from_date_iso.replace('Z', '+00:00')) # reserved _ = datetime.fromisoformat(record_from_date_iso.replace('Z', '+00:00')) # reserved
gap_config = config.get('gap_filling', {}) gap_config = config.get('gap_filling', {})
max_gap_size = gap_config.get('max_gap_size_candles', 1000) max_gap_size = gap_config.get('max_gap_size_candles', 1000)
max_attempts = int(gap_config.get('max_fill_attempts', max_attempts = int(gap_config.get('max_fill_attempts',
int(config.get("collection", {}).get("max_retries", int(config.get("collection", {}).get("max_retries",
int(os.getenv("MAX_RETRIES", "3")))))) int(os.getenv("MAX_RETRIES", "3"))))))
averaging_lookback = gap_config.get('averaging_lookback_candles', 10) averaging_lookback = gap_config.get('averaging_lookback_candles', 10)
max_empty_seq = gap_config.get('max_consecutive_empty_candles', 5) max_empty_seq = gap_config.get('max_consecutive_empty_candles', 5)
for interval in intervals: for interval in intervals:
self.logger.info(f"Checking gaps for {symbol} {interval}") self.logger.info(f"Checking gaps for {symbol} {interval}")
# Detect gaps # Detect gaps
gaps_info = await db_manager.detect_gaps(symbol, interval) gaps_info = await db_manager.detect_gaps(symbol, interval)
gaps = gaps_info.get('gaps', []) gaps = gaps_info.get('gaps', [])
interval_result = { interval_result = {
'gaps_found': len(gaps), 'gaps_found': len(gaps),
'gaps_filled': 0, 'gaps_filled': 0,
@@ -679,6 +817,7 @@ class BinanceDataCollector:
for gap in gaps: for gap in gaps:
missing_candles = gap['missing_candles'] missing_candles = gap['missing_candles']
# Skip if gap is too large # Skip if gap is too large
if missing_candles > max_gap_size: if missing_candles > max_gap_size:
self.logger.info(f"Skipping large gap: {missing_candles} candles") self.logger.info(f"Skipping large gap: {missing_candles} candles")
@@ -688,6 +827,7 @@ class BinanceDataCollector:
try: try:
gap_start = datetime.fromisoformat(gap['gap_start']) gap_start = datetime.fromisoformat(gap['gap_start'])
gap_end = datetime.fromisoformat(gap['gap_end']) gap_end = datetime.fromisoformat(gap['gap_end'])
self.logger.info(f"Filling gap: {gap_start} to {gap_end}") self.logger.info(f"Filling gap: {gap_start} to {gap_end}")
# Attempt multiple real fills before resorting to averaging # Attempt multiple real fills before resorting to averaging
@@ -697,13 +837,17 @@ class BinanceDataCollector:
# Small buffer around the gap to ensure edges are covered # Small buffer around the gap to ensure edges are covered
buffered_start = gap_start - timedelta(milliseconds=1) buffered_start = gap_start - timedelta(milliseconds=1)
buffered_end = gap_end + timedelta(milliseconds=1) buffered_end = gap_end + timedelta(milliseconds=1)
added = await self._collect_historical_klines( added = await self._collect_historical_klines(
symbol, interval, buffered_start, buffered_end symbol, interval, buffered_start, buffered_end
) )
real_filled_records += added real_filled_records += added
if added > 0: if added > 0:
# A successful fill; break early # A successful fill; break early
break break
except Exception as e: except Exception as e:
# Log and continue attempts # Log and continue attempts
interval_result['errors'].append( interval_result['errors'].append(
@@ -715,6 +859,7 @@ class BinanceDataCollector:
interval_result['gaps_filled'] += 1 interval_result['gaps_filled'] += 1
results['total_gaps_filled'] += 1 results['total_gaps_filled'] += 1
self.logger.info(f"Successfully filled gap with {real_filled_records} records") self.logger.info(f"Successfully filled gap with {real_filled_records} records")
else: else:
# Genuine empty gap - fill with averages if enabled # Genuine empty gap - fill with averages if enabled
if fill_genuine_gaps and gap_config.get('enable_intelligent_averaging', True): if fill_genuine_gaps and gap_config.get('enable_intelligent_averaging', True):
@@ -723,6 +868,7 @@ class BinanceDataCollector:
max_empty_seq, max_empty_seq,
averaging_lookback averaging_lookback
) )
interval_result['genuine_filled'] += filled interval_result['genuine_filled'] += filled
results['total_genuine_filled'] += filled results['total_genuine_filled'] += filled
@@ -736,10 +882,11 @@ class BinanceDataCollector:
results['intervals'][interval] = interval_result results['intervals'][interval] = interval_result
# Calculate and store indicators after any fills # Calculate and store indicators after any fills, then ensure coverage
if interval_result['gaps_filled'] > 0 or interval_result['genuine_filled'] > 0: if interval_result['gaps_filled'] > 0 or interval_result['genuine_filled'] > 0:
try: try:
await self._calculate_and_store_indicators(symbol, interval) await self._calculate_and_store_indicators(symbol, interval)
await self._ensure_indicator_coverage(symbol, interval)
except Exception as e: except Exception as e:
self.logger.error(f"Error calculating indicators: {e}", exc_info=True) self.logger.error(f"Error calculating indicators: {e}", exc_info=True)
@@ -754,6 +901,7 @@ class BinanceDataCollector:
async def start_auto_gap_fill_scheduler(self): async def start_auto_gap_fill_scheduler(self):
"""Start background task for automatic gap filling""" """Start background task for automatic gap filling"""
global config global config
gap_config = config.get('gap_filling', {}) gap_config = config.get('gap_filling', {})
if not gap_config.get('enable_auto_gap_filling', False): if not gap_config.get('enable_auto_gap_filling', False):
self.logger.info("Auto gap filling is disabled") self.logger.info("Auto gap filling is disabled")
@@ -761,21 +909,25 @@ class BinanceDataCollector:
schedule_hours = gap_config.get('auto_fill_schedule_hours', 24) schedule_hours = gap_config.get('auto_fill_schedule_hours', 24)
self.logger.info(f"Starting auto gap fill scheduler (every {schedule_hours} hours)") self.logger.info(f"Starting auto gap fill scheduler (every {schedule_hours} hours)")
self.is_collecting = True self.is_collecting = True
while self.is_collecting: while self.is_collecting:
try: try:
# Get all enabled pairs # Get all enabled pairs
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
for pair in enabled_pairs: for pair in enabled_pairs:
symbol = pair['symbol'] symbol = pair['symbol']
self.logger.info(f"Running scheduled gap fill for {symbol}") self.logger.info(f"Running scheduled gap fill for {symbol}")
try: try:
await self.auto_fill_gaps( await self.auto_fill_gaps(
symbol, symbol,
intervals=gap_config.get('intervals_to_monitor'), intervals=gap_config.get('intervals_to_monitor'),
fill_genuine_gaps=gap_config.get('enable_intelligent_averaging', True) fill_genuine_gaps=gap_config.get('enable_intelligent_averaging', True)
) )
except Exception as e: except Exception as e:
self.logger.error(f"Error in scheduled gap fill for {symbol}: {e}") self.logger.error(f"Error in scheduled gap fill for {symbol}: {e}")
@@ -786,6 +938,7 @@ class BinanceDataCollector:
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info("Auto gap fill scheduler cancelled") self.logger.info("Auto gap fill scheduler cancelled")
break break
except Exception as e: except Exception as e:
self.logger.error(f"Error in auto gap fill scheduler: {e}", exc_info=True) self.logger.error(f"Error in auto gap fill scheduler: {e}", exc_info=True)
await asyncio.sleep(3600) # Wait 1 hour on error await asyncio.sleep(3600) # Wait 1 hour on error
@@ -806,6 +959,7 @@ class BinanceDataCollector:
# Create WebSocket tasks for each enabled trading pair # Create WebSocket tasks for each enabled trading pair
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)] enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
if not enabled_pairs: if not enabled_pairs:
self.logger.warning("No enabled trading pairs found") self.logger.warning("No enabled trading pairs found")
return return
@@ -838,12 +992,21 @@ class BinanceDataCollector:
) )
running_tasks[task_name] = task running_tasks[task_name] = task
self.logger.info(f"Started {len(running_tasks)} tasks including gap fill scheduler") # Start indicator coverage monitor
task_name = "indicator_coverage_monitor"
task = asyncio.create_task(
self.start_indicator_coverage_monitor(),
name=task_name
)
running_tasks[task_name] = task
self.logger.info(f"Started {len(running_tasks)} tasks including gap fill scheduler and indicator coverage monitor")
async def _websocket_kline_stream(self, symbol: str, interval: str): async def _websocket_kline_stream(self, symbol: str, interval: str):
"""WebSocket stream for kline/candlestick data""" """WebSocket stream for kline/candlestick data"""
stream_name = f"{symbol}@kline_{interval}" stream_name = f"{symbol}@kline_{interval}"
uri = f"wss://stream.binance.com:9443/ws/{stream_name}" uri = f"wss://stream.binance.com:9443/ws/{stream_name}"
reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5) reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5)
ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20)) ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20))
ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60)) ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60))
@@ -861,6 +1024,7 @@ class BinanceDataCollector:
async for message in websocket: async for message in websocket:
if not self.websocket_collection_running: if not self.websocket_collection_running:
break break
try: try:
data = json.loads(message) data = json.loads(message)
@@ -871,6 +1035,7 @@ class BinanceDataCollector:
# Parse kline data # Parse kline data
ohlcv_data = parse_kline_data(data) ohlcv_data = parse_kline_data(data)
# Store in database # Store in database
await db_manager.insert_ohlcv_single(ohlcv_data) await db_manager.insert_ohlcv_single(ohlcv_data)
@@ -882,19 +1047,24 @@ class BinanceDataCollector:
except json.JSONDecodeError: except json.JSONDecodeError:
self.logger.error(f"Invalid JSON from {stream_name}") self.logger.error(f"Invalid JSON from {stream_name}")
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info(f"Kline stream cancelled: {stream_name}") self.logger.info(f"Kline stream cancelled: {stream_name}")
break break
except Exception as e: except Exception as e:
self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True) self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True)
except websockets.exceptions.ConnectionClosed as e: except websockets.exceptions.ConnectionClosed as e:
self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}") self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}")
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info(f"Kline WebSocket cancelled for {stream_name}") self.logger.info(f"Kline WebSocket cancelled for {stream_name}")
break break
except Exception as e: except Exception as e:
self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True) self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True)
finally: finally:
# Clean up # Clean up
if stream_name in websocket_connections: if stream_name in websocket_connections:
@@ -908,6 +1078,7 @@ class BinanceDataCollector:
"""WebSocket stream for trade/tick data""" """WebSocket stream for trade/tick data"""
stream_name = f"{symbol}@trade" stream_name = f"{symbol}@trade"
uri = f"wss://stream.binance.com:9443/ws/{stream_name}" uri = f"wss://stream.binance.com:9443/ws/{stream_name}"
reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5) reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5)
ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20)) ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20))
ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60)) ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60))
@@ -931,8 +1102,10 @@ class BinanceDataCollector:
async for message in websocket: async for message in websocket:
if not self.websocket_collection_running: if not self.websocket_collection_running:
break break
try: try:
data = json.loads(message) data = json.loads(message)
if data.get('e') == 'trade': if data.get('e') == 'trade':
# Parse trade data # Parse trade data
tick_data = parse_trade_data(data) tick_data = parse_trade_data(data)
@@ -945,9 +1118,11 @@ class BinanceDataCollector:
except json.JSONDecodeError: except json.JSONDecodeError:
self.logger.error(f"Invalid JSON from {stream_name}") self.logger.error(f"Invalid JSON from {stream_name}")
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info(f"Trade stream cancelled: {stream_name}") self.logger.info(f"Trade stream cancelled: {stream_name}")
break break
except Exception as e: except Exception as e:
self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True) self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True)
@@ -957,11 +1132,14 @@ class BinanceDataCollector:
except websockets.exceptions.ConnectionClosed as e: except websockets.exceptions.ConnectionClosed as e:
self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}") self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}")
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info(f"Trade WebSocket cancelled for {stream_name}") self.logger.info(f"Trade WebSocket cancelled for {stream_name}")
break break
except Exception as e: except Exception as e:
self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True) self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True)
finally: finally:
# Clean up # Clean up
if stream_name in websocket_connections: if stream_name in websocket_connections:
@@ -985,12 +1163,12 @@ class BinanceDataCollector:
for task_name, task in list(running_tasks.items()): for task_name, task in list(running_tasks.items()):
if not task.done(): if not task.done():
task.cancel() task.cancel()
try: try:
await task await task
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.info(f"Cancelled task: {task_name}") self.logger.info(f"Cancelled task: {task_name}")
except Exception as e: except Exception as e:
self.logger.error(f"Error cancelling task {task_name}: {e}") self.logger.error(f"Error cancelling task {task_name}: {e}")
# Close WebSocket connections # Close WebSocket connections
for conn_name, conn in list(websocket_connections.items()): for conn_name, conn in list(websocket_connections.items()):
@@ -1002,6 +1180,7 @@ class BinanceDataCollector:
running_tasks.clear() running_tasks.clear()
websocket_connections.clear() websocket_connections.clear()
self.logger.info("Continuous data collection stopped") self.logger.info("Continuous data collection stopped")
# --------------------------- # ---------------------------
@@ -1027,6 +1206,7 @@ class BinanceDataCollector:
# Get tick data from database # Get tick data from database
ticks = await db_manager.get_tick_data(symbol, start_time, end_time) ticks = await db_manager.get_tick_data(symbol, start_time, end_time)
if not ticks: if not ticks:
self.logger.warning(f"No tick data found for {symbol}") self.logger.warning(f"No tick data found for {symbol}")
return return
@@ -1043,11 +1223,13 @@ class BinanceDataCollector:
'low': 'min', 'low': 'min',
'close': 'last' 'close': 'last'
}) })
volume = df['quantity'].resample(interval).sum() volume = df['quantity'].resample(interval).sum()
trade_count = df.resample(interval).size() trade_count = df.resample(interval).size()
# Combine data # Combine data
candles: List[Dict[str, Any]] = [] candles: List[Dict[str, Any]] = []
for timestamp, row in ohlcv.iterrows(): for timestamp, row in ohlcv.iterrows():
if pd.notna(row['open']): # Skip empty periods if pd.notna(row['open']): # Skip empty periods
candle = { candle = {
@@ -1069,8 +1251,11 @@ class BinanceDataCollector:
if candles: if candles:
await db_manager.insert_ohlcv_batch(candles) await db_manager.insert_ohlcv_batch(candles)
self.logger.info(f"Generated and stored {len(candles)} candles for {symbol} {interval}") self.logger.info(f"Generated and stored {len(candles)} candles for {symbol} {interval}")
# Calculate technical indicators
# Calculate technical indicators and ensure coverage
await self._calculate_and_store_indicators(symbol, interval) await self._calculate_and_store_indicators(symbol, interval)
await self._ensure_indicator_coverage(symbol, interval)
else: else:
self.logger.warning(f"No candles generated for {symbol} {interval}") self.logger.warning(f"No candles generated for {symbol} {interval}")
@@ -1111,6 +1296,7 @@ def start_ui_server():
# Get UI configuration from environment # Get UI configuration from environment
host = os.getenv("WEB_HOST", "0.0.0.0") host = os.getenv("WEB_HOST", "0.0.0.0")
port = os.getenv("WEB_PORT", "8000") port = os.getenv("WEB_PORT", "8000")
logger.info(f"Starting UI server on {host}:{port}") logger.info(f"Starting UI server on {host}:{port}")
# Start ui.py as a subprocess # Start ui.py as a subprocess
@@ -1121,6 +1307,7 @@ def start_ui_server():
text=True, text=True,
bufsize=1 bufsize=1
) )
logger.info(f"✓ UI server started with PID: {ui_process.pid}") logger.info(f"✓ UI server started with PID: {ui_process.pid}")
# Start a thread to log UI output # Start a thread to log UI output
@@ -1161,6 +1348,7 @@ def start_ui_server():
stdout_thread = threading.Thread(target=log_ui_output, daemon=True) stdout_thread = threading.Thread(target=log_ui_output, daemon=True)
stderr_thread = threading.Thread(target=log_ui_stderr, daemon=True) stderr_thread = threading.Thread(target=log_ui_stderr, daemon=True)
stdout_thread.start() stdout_thread.start()
stderr_thread.start() stderr_thread.start()
@@ -1178,6 +1366,7 @@ def stop_ui_server():
try: try:
logger.info("Stopping UI server...") logger.info("Stopping UI server...")
ui_process.terminate() ui_process.terminate()
try: try:
ui_process.wait(timeout=10) ui_process.wait(timeout=10)
logger.info("✓ UI server stopped gracefully") logger.info("✓ UI server stopped gracefully")
@@ -1186,8 +1375,10 @@ def stop_ui_server():
ui_process.kill() ui_process.kill()
ui_process.wait() ui_process.wait()
logger.info("✓ UI server forcefully stopped") logger.info("✓ UI server forcefully stopped")
except Exception as e: except Exception as e:
logger.error(f"✗ Error stopping UI server: {e}") logger.error(f"✗ Error stopping UI server: {e}")
finally: finally:
ui_process = None ui_process = None
else: else:
@@ -1246,17 +1437,23 @@ async def main():
except KeyboardInterrupt: except KeyboardInterrupt:
logging.getLogger(__name__).info("Received keyboard interrupt") logging.getLogger(__name__).info("Received keyboard interrupt")
except asyncio.CancelledError: except asyncio.CancelledError:
logging.getLogger(__name__).info("Application cancelled") logging.getLogger(__name__).info("Application cancelled")
except Exception as e: except Exception as e:
logging.getLogger(__name__).error(f"Application error: {e}", exc_info=True) logging.getLogger(__name__).error(f"Application error: {e}", exc_info=True)
finally: finally:
# Clean shutdown # Clean shutdown
logging.getLogger(__name__).info("Initiating shutdown...") logging.getLogger(__name__).info("Initiating shutdown...")
# Stop UI server first # Stop UI server first
stop_ui_server() stop_ui_server()
# Then cleanup collector # Then cleanup collector
await collector.cleanup() await collector.cleanup()
logging.getLogger(__name__).info("Application shutdown complete") logging.getLogger(__name__).info("Application shutdown complete")
@@ -1267,4 +1464,4 @@ if __name__ == "__main__":
print("\nShutdown requested by user") print("\nShutdown requested by user")
except Exception as e: except Exception as e:
print(f"Fatal error: {e}") print(f"Fatal error: {e}")
sys.exit(1) sys.exit(1)