Files
Market-Data-Downloader/main.py
2025-10-05 13:10:12 +01:00

1037 lines
43 KiB
Python

#!/usr/bin/env python3
"""
main.py - Complete Binance Trading Data Collection System
Main application entry point with async data collection, websocket handling, and task management
"""
import asyncio
import logging
import signal
import sys
import json
import subprocess
import os
from datetime import datetime, timedelta, timezone
from typing import Dict, List, Optional, Any
from contextlib import asynccontextmanager
import websockets
import aiohttp
from binance.client import Client
from binance.exceptions import BinanceAPIException
import pandas as pd
import pandas_ta as ta
from dotenv import load_dotenv
# Import our modules
from db import DatabaseManager
from utils import (
load_config, setup_logging, parse_kline_data, parse_trade_data,
calculate_technical_indicators, validate_symbol, format_timestamp
)
# Load environment variables
load_dotenv('variables.env')
# Global variables
db_manager: Optional[DatabaseManager] = None
config: Dict[str, Any] = {}
running_tasks: Dict[str, asyncio.Task] = {}
websocket_connections: Dict[str, Any] = {}
ui_process: Optional[subprocess.Popen] = None
class BinanceDataCollector:
"""Main data collection orchestrator for Binance trading data"""
def __init__(self):
self.client: Optional[Client] = None
self.logger = logging.getLogger(__name__)
self.is_collecting = False
self.websocket_collection_running = False
self.download_progress: Dict[str, Any] = {}
max_downloads = int(os.getenv('MAX_CONCURRENT_DOWNLOADS', '3'))
max_gap_fills = int(os.getenv('MAX_CONCURRENT_GAP_FILLS', '2'))
self._download_semaphore = asyncio.Semaphore(max_downloads)
self._gap_fill_semaphore = asyncio.Semaphore(max_gap_fills)
self.logger.info(f"Initialized with max {max_downloads} concurrent downloads, {max_gap_fills} gap fills")
async def initialize(self):
"""Initialize the data collector"""
global db_manager, config
# Setup logging
setup_logging()
self.logger.info("Initializing Binance Data Collector")
# Load configuration
config = load_config()
self.logger.info(f"Loaded configuration for {len(config['trading_pairs'])} trading pairs")
# Initialize database
db_manager = DatabaseManager()
await db_manager.initialize()
self.logger.info("Database initialized successfully")
# Initialize Binance client (no API key needed for market data)
api_key = os.getenv('BINANCE_API_KEY')
secret_key = os.getenv('BINANCE_SECRET_KEY')
if api_key and secret_key:
self.client = Client(api_key, secret_key)
self.logger.info("Binance client initialized with API credentials")
else:
self.client = Client()
self.logger.info("Binance client initialized without API credentials (public data only)")
async def bulk_download_historical_data(
self, symbol: str, start_date: datetime, end_date: Optional[datetime] = None,
intervals: Optional[List[str]] = None
):
"""
Bulk download historical OHLCV data from Binance with intelligent gap detection.
Only downloads data that doesn't already exist in the database.
"""
async with self._download_semaphore:
if end_date is None:
end_date = datetime.now(timezone.utc)
# Ensure timezone awareness
if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc)
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)
self.logger.info(f"Starting intelligent bulk download for {symbol} from {start_date} to {end_date}")
# Get intervals
if intervals is None:
intervals = config.get("collection", {}).get("candle_intervals", ["1m", "5m", "15m", "1h", "4h", "1d"])
# Initialize progress tracking
self.download_progress[symbol] = {
"status": "running",
"intervals": {},
"start_time": datetime.now(timezone.utc).isoformat()
}
for interval in intervals:
self.download_progress[symbol]["intervals"][interval] = {
"status": "pending",
"records": 0
}
try:
for interval in intervals:
self.logger.info(f"Processing {interval} data for {symbol}")
self.download_progress[symbol]["intervals"][interval]["status"] = "checking"
# Intelligent download - only missing data
records_count = await self._collect_historical_klines(
symbol, interval, start_date, end_date
)
if records_count > 0:
self.download_progress[symbol]["intervals"][interval]["status"] = "calculating_indicators"
self.download_progress[symbol]["intervals"][interval]["records"] = records_count
# Calculate indicators for new data
await self._calculate_and_store_indicators(symbol, interval)
self.download_progress[symbol]["intervals"][interval]["status"] = "completed"
self.logger.info(f"Completed {interval} data for {symbol} - {records_count} new records")
else:
self.download_progress[symbol]["intervals"][interval]["status"] = "skipped_complete"
self.logger.info(f"Skipped {interval} for {symbol} - data already complete")
self.download_progress[symbol]["status"] = "completed"
self.download_progress[symbol]["end_time"] = datetime.now(timezone.utc).isoformat()
except Exception as e:
self.logger.error(f"Error in bulk download for {symbol}: {e}")
self.download_progress[symbol]["status"] = "error"
self.download_progress[symbol]["error"] = str(e)
raise
async def _collect_historical_klines(
self, symbol: str, interval: str, start_date: datetime, end_date: datetime
) -> int:
"""
Intelligently collect historical kline data, only downloading missing ranges.
Returns:
Number of NEW records collected (not including already existing data)
"""
global db_manager
# Check if data already exists for this range
coverage_check = await db_manager.check_data_exists_for_range(
symbol, interval, start_date, end_date
)
self.logger.info(
f"Data coverage for {symbol} {interval}: "
f"{coverage_check['coverage_percent']:.2f}% "
f"({coverage_check['count']}/{coverage_check['expected_count']} records)"
)
# If coverage is complete, skip download
if coverage_check['is_complete']:
self.logger.info(
f"Skipping {symbol} {interval} - data already complete "
f"({coverage_check['coverage_percent']:.2f}% coverage)"
)
return 0
# Get missing time ranges that need to be downloaded
missing_ranges = await db_manager.get_missing_time_ranges(
symbol, interval, start_date, end_date
)
if not missing_ranges:
self.logger.info(f"No missing data ranges for {symbol} {interval}")
return 0
self.logger.info(
f"Found {len(missing_ranges)} missing time range(s) for {symbol} {interval}"
)
# Download each missing range
total_new_records = 0
for idx, time_range in enumerate(missing_ranges, 1):
range_start = time_range['start']
range_end = time_range['end']
self.logger.info(
f"Downloading range {idx}/{len(missing_ranges)}: "
f"{range_start} to {range_end} for {symbol} {interval}"
)
records_in_range = await self._download_time_range(
symbol, interval, range_start, range_end
)
total_new_records += records_in_range
self.logger.info(
f"Downloaded {records_in_range} records for range {idx}/{len(missing_ranges)}"
)
return total_new_records
def _calculate_chunk_end(self, start: datetime, interval: str, chunk_size: int) -> datetime:
"""Calculate the end time for a data chunk based on interval"""
if interval.endswith('m'):
minutes = int(interval[:-1])
return start + timedelta(minutes=minutes * chunk_size)
elif interval.endswith('h'):
hours = int(interval[:-1])
return start + timedelta(hours=hours * chunk_size)
elif interval.endswith('d'):
days = int(interval[:-1])
return start + timedelta(days=days * chunk_size)
elif interval.endswith('w'):
weeks = int(interval[:-1])
return start + timedelta(weeks=weeks * chunk_size)
else:
# Default to minutes
return start + timedelta(minutes=chunk_size)
@staticmethod
def _rest_kline_to_ws_event(symbol: str, interval: str, kline_row: List[Any]) -> Dict[str, Any]:
"""
Convert REST get_historical_klines row (list) to a WebSocket-style kline event
that parse_kline_data expects.
"""
# Per Binance REST klines: index meanings
# 0 open time(ms),1 open,2 high,3 low,4 close,5 volume,
# 6 close time(ms),7 quote asset volume,8 number of trades,
# 9 taker buy base asset volume,10 taker buy quote asset volume,11 ignore
return {
"e": "kline",
"E": int(kline_row[6]), # event time (approx close time)
"s": symbol.upper(),
"k": {
"t": int(kline_row[0]),
"T": int(kline_row[6]),
"s": symbol.upper(),
"i": interval,
"f": None, # first trade id (unknown from REST row)
"L": None, # last trade id (unknown)
"o": str(kline_row[1]),
"c": str(kline_row[4]),
"h": str(kline_row[2]),
"l": str(kline_row[3]),
"v": str(kline_row[5]),
"n": int(kline_row[8]),
"x": True, # REST klines are for closed candles
"q": str(kline_row[7]),
"V": None, # taker buy base asset volume (optional)
"Q": None, # taker buy quote asset volume (optional)
"B": None # ignore
}
}
async def _download_time_range(
self, symbol: str, interval: str, start_date: datetime, end_date: datetime
) -> int:
"""
Download data for a specific time range (internal method).
This is the actual download logic extracted from the original collect_historical_klines.
Returns:
Number of records downloaded and inserted
"""
global config, db_manager
chunk_size = config.get("collection", {}).get("bulk_chunk_size", 1000)
max_retries = config.get("collection", {}).get("max_retries", 3)
retry_delay = config.get("collection", {}).get("retry_delay", 1)
# Normalize time inputs that might be naive time objects
from datetime import time as dt_time
if isinstance(start_date, dt_time):
# Use today's date in UTC for safety if only a time is provided
start_date = datetime.combine(datetime.now(timezone.utc).date(), start_date)
if isinstance(end_date, dt_time):
# Use the same date as start_date if possible for consistency
base_date = start_date.date() if isinstance(start_date, datetime) else datetime.now(timezone.utc).date()
end_date = datetime.combine(base_date, end_date)
if start_date.tzinfo is None:
start_date = start_date.replace(tzinfo=timezone.utc)
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)
# Convert to naive UTC for Binance API
current_start = start_date.replace(tzinfo=None)
end = end_date.replace(tzinfo=None)
total_records = 0
retry_count = 0
while current_start < end:
try:
# Calculate chunk end time based on interval
chunk_end = self._calculate_chunk_end(current_start, interval, chunk_size)
chunk_end = min(chunk_end, end)
# Get klines from Binance with retry logic
klines: Optional[List[List[Any]]] = None
for attempt in range(max_retries):
try:
klines = self.client.get_historical_klines(
symbol=symbol,
interval=interval,
start_str=int(current_start.timestamp() * 1000),
end_str=int(chunk_end.timestamp() * 1000),
limit=chunk_size
)
break
except BinanceAPIException as e:
if e.code == -1003: # Rate limit
wait_time = retry_delay * (2 ** attempt)
self.logger.warning(f"Rate limit hit, waiting {wait_time}s before retry")
await asyncio.sleep(wait_time)
else:
raise
except Exception as e:
if attempt == max_retries - 1:
raise
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
await asyncio.sleep(retry_delay)
if not klines or len(klines) == 0:
self.logger.info(f"No more data available for {symbol} {interval}")
break
# Parse and store klines
ohlcv_data: List[Dict[str, Any]] = []
for kline in klines:
try:
# Normalize to WebSocket-style event expected by parse_kline_data
ws_event = self._rest_kline_to_ws_event(symbol, interval, kline)
parsed_data = parse_kline_data(ws_event)
ohlcv_data.append(parsed_data)
except Exception as e:
# Keep original message to aid debugging if structure differs
self.logger.error(f"Error parsing kline data: {e} | raw={kline!r}")
continue
# Batch insert to database
if ohlcv_data:
await db_manager.insert_ohlcv_batch(ohlcv_data)
total_records += len(ohlcv_data)
# Update progress
if symbol in self.download_progress and interval in self.download_progress[symbol]["intervals"]:
self.download_progress[symbol]["intervals"][interval]["records"] = total_records
self.logger.debug(f"Stored {len(ohlcv_data)} {interval} candles for {symbol} (total: {total_records})")
# Update current_start for next chunk
if klines:
last_close_time_ms = klines[-1][6] # Use the close time of the last kline
current_start = datetime.utcfromtimestamp((last_close_time_ms + 1) / 1000)
else:
break
# Delay to respect rate limits
await asyncio.sleep(0.2)
retry_count = 0 # Reset retry count on success
except BinanceAPIException as e:
retry_count += 1
self.logger.error(f"Binance API error (attempt {retry_count}): {e}")
if retry_count >= max_retries:
self.logger.error(f"Max retries reached for {symbol} {interval}")
raise
# Exponential backoff
wait_time = retry_delay * (2 ** retry_count)
await asyncio.sleep(wait_time)
except asyncio.CancelledError:
self.logger.info(f"Download for {symbol} {interval} cancelled")
break
except Exception as e:
self.logger.error(f"Error collecting {interval} data for {symbol}: {e}", exc_info=True)
raise
return total_records
async def _calculate_and_store_indicators(self, symbol: str, interval: str):
"""Calculate and store technical indicators for a symbol and interval"""
try:
# Check if indicators are enabled for this interval
indicator_config = config.get('technical_indicators', {})
calc_intervals = indicator_config.get('calculation_intervals', ['1m', '5m', '15m', '1h', '4h', '1d'])
if interval not in calc_intervals:
self.logger.debug(f"Skipping indicators for {symbol} {interval} (not in calculation_intervals)")
return
# Get OHLCV data from database (need enough for longest indicator period)
max_period = 200 # Maximum period for indicators like SMA-200
ohlcv_data = await db_manager.get_ohlcv_data(symbol, interval, limit=max_period + 50)
if len(ohlcv_data) < 50: # Need minimum data for indicators
self.logger.warning(f"Not enough data for indicators: {symbol} {interval} ({len(ohlcv_data)} records)")
return
# Convert to DataFrame
df = pd.DataFrame(ohlcv_data)
df['time'] = pd.to_datetime(df['time'])
df = df.sort_values('time')
df.set_index('time', inplace=True)
# Rename columns for pandas_ta
df = df.rename(columns={
'open_price': 'open',
'high_price': 'high',
'low_price': 'low',
'close_price': 'close'
})
# Calculate technical indicators
indicators_data = calculate_technical_indicators(df, indicator_config)
# Store indicators in database
if indicators_data:
await db_manager.insert_indicators_batch(symbol, interval, indicators_data)
self.logger.info(f"Stored {len(indicators_data)} indicator values for {symbol} {interval}")
except asyncio.CancelledError:
self.logger.info(f"Indicator calculation cancelled for {symbol} {interval}")
except Exception as e:
self.logger.error(f"Error calculating indicators for {symbol} {interval}: {e}", exc_info=True)
async def auto_fill_gaps(
self,
symbol: str,
intervals: Optional[List[str]] = None,
fill_genuine_gaps: bool = True
) -> Dict[str, Any]:
"""
Automatically fill gaps for a symbol
Args:
symbol: Trading pair symbol
intervals: List of intervals to fill (default: from config)
fill_genuine_gaps: Whether to fill genuine empty gaps with averages
Returns:
Dictionary with fill results
"""
# Acquire semaphore to limit concurrent gap fills
async with self._gap_fill_semaphore:
global config, db_manager
if intervals is None:
intervals = config.get('gap_filling', {}).get('intervals_to_monitor', ['1m', '5m', '15m', '1h', '4h', '1d'])
self.logger.info(f"Starting auto gap fill for {symbol} on intervals: {intervals}")
results: Dict[str, Any] = {
'symbol': symbol,
'intervals': {},
'total_gaps_filled': 0,
'total_genuine_filled': 0
}
try:
# Get record_from_date for this symbol
pair_config = next((p for p in config['trading_pairs'] if p['symbol'] == symbol), None)
if not pair_config:
self.logger.warning(f"Symbol {symbol} not found in config")
return results
record_from_date = pair_config.get('record_from_date')
if not record_from_date:
record_from_date = config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z')
_ = datetime.fromisoformat(record_from_date.replace('Z', '+00:00')) # kept for future use
gap_config = config.get('gap_filling', {})
max_gap_size = gap_config.get('max_gap_size_candles', 1000)
for interval in intervals:
self.logger.info(f"Checking gaps for {symbol} {interval}")
# Detect gaps
gaps_info = await db_manager.detect_gaps(symbol, interval)
gaps = gaps_info.get('gaps', [])
interval_result = {
'gaps_found': len(gaps),
'gaps_filled': 0,
'genuine_filled': 0,
'errors': []
}
# Fill downloadable gaps
for gap in gaps:
missing_candles = gap['missing_candles']
# Skip if gap is too large
if missing_candles > max_gap_size:
self.logger.info(f"Skipping large gap: {missing_candles} candles")
interval_result['errors'].append(f"Gap too large: {missing_candles} candles")
continue
try:
# Download missing data
gap_start = datetime.fromisoformat(gap['gap_start'])
gap_end = datetime.fromisoformat(gap['gap_end'])
self.logger.info(f"Filling gap: {gap_start} to {gap_end}")
records_count = await self._collect_historical_klines(
symbol, interval, gap_start, gap_end
)
if records_count > 0:
interval_result['gaps_filled'] += 1
results['total_gaps_filled'] += 1
self.logger.info(f"Successfully filled gap with {records_count} records")
else:
# Genuine empty gap - fill with averages if enabled
if fill_genuine_gaps:
filled = await db_manager.fill_genuine_gaps_with_averages(
symbol, interval,
gap_config.get('max_consecutive_empty_candles', 5),
gap_config.get('averaging_lookback_candles', 10)
)
interval_result['genuine_filled'] += filled
results['total_genuine_filled'] += filled
# Small delay between gaps
await asyncio.sleep(0.5)
except Exception as e:
error_msg = f"Error filling gap: {str(e)}"
self.logger.error(error_msg)
interval_result['errors'].append(error_msg)
results['intervals'][interval] = interval_result
# Calculate and store indicators after filling gaps
if interval_result['gaps_filled'] > 0 or interval_result['genuine_filled'] > 0:
try:
await self._calculate_and_store_indicators(symbol, interval)
except Exception as e:
self.logger.error(f"Error calculating indicators: {e}", exc_info=True)
self.logger.info(f"Auto gap fill completed for {symbol}: {results}")
return results
except Exception as e:
self.logger.error(f"Error in auto gap fill: {e}", exc_info=True)
results['error'] = str(e)
return results
async def start_auto_gap_fill_scheduler(self):
"""Start background task for automatic gap filling"""
global config, db_manager
gap_config = config.get('gap_filling', {})
if not gap_config.get('enable_auto_gap_filling', False):
self.logger.info("Auto gap filling is disabled")
return
schedule_hours = gap_config.get('auto_fill_schedule_hours', 24)
self.logger.info(f"Starting auto gap fill scheduler (every {schedule_hours} hours)")
while self.is_collecting:
try:
# Get all enabled pairs
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
for pair in enabled_pairs:
symbol = pair['symbol']
self.logger.info(f"Running scheduled gap fill for {symbol}")
try:
await self.auto_fill_gaps(
symbol,
intervals=gap_config.get('intervals_to_monitor'),
fill_genuine_gaps=gap_config.get('enable_intelligent_averaging', True)
)
except Exception as e:
self.logger.error(f"Error in scheduled gap fill for {symbol}: {e}")
# Wait for next scheduled run
self.logger.info(f"Next auto gap fill in {schedule_hours} hours")
await asyncio.sleep(schedule_hours * 3600)
except asyncio.CancelledError:
self.logger.info("Auto gap fill scheduler cancelled")
break
except Exception as e:
self.logger.error(f"Error in auto gap fill scheduler: {e}", exc_info=True)
await asyncio.sleep(3600) # Wait 1 hour on error
async def start_continuous_collection(self):
"""Start continuous data collection via WebSocket"""
if self.websocket_collection_running:
self.logger.warning("WebSocket collection already running")
return
self.logger.info("Starting continuous WebSocket data collection")
self.websocket_collection_running = True
self.is_collecting = True
# Create WebSocket tasks for each enabled trading pair
enabled_pairs = [p for p in config['trading_pairs'] if p.get('enabled', True)]
if not enabled_pairs:
self.logger.warning("No enabled trading pairs found")
return
for pair_config in enabled_pairs:
symbol = pair_config['symbol'].lower()
# Start kline streams for configured intervals
for interval in config['collection']['candle_intervals']:
task_name = f"kline_{symbol}_{interval}"
task = asyncio.create_task(
self._websocket_kline_stream(symbol, interval),
name=task_name
)
running_tasks[task_name] = task
# Start trade stream for tick data
task_name = f"trade_{symbol}"
task = asyncio.create_task(
self._websocket_trade_stream(symbol),
name=task_name
)
running_tasks[task_name] = task
# Start auto gap fill scheduler
task_name = "auto_gap_fill_scheduler"
task = asyncio.create_task(
self.start_auto_gap_fill_scheduler(),
name=task_name
)
running_tasks[task_name] = task
self.logger.info(f"Started {len(running_tasks)} tasks including gap fill scheduler")
async def _websocket_kline_stream(self, symbol: str, interval: str):
"""WebSocket stream for kline/candlestick data"""
stream_name = f"{symbol}@kline_{interval}"
uri = f"wss://stream.binance.com:9443/ws/{stream_name}"
reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5)
ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20))
ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60))
while self.websocket_collection_running:
try:
async with websockets.connect(
uri,
ping_interval=ping_interval,
ping_timeout=ping_timeout
) as websocket:
self.logger.info(f"Connected to {stream_name}")
websocket_connections[stream_name] = websocket
async for message in websocket:
if not self.websocket_collection_running:
break
try:
data = json.loads(message)
# Validate event type and payload shape
if data.get('e') != 'kline' or 'k' not in data:
self.logger.debug(f"Ignored non-kline or malformed message on {stream_name}")
continue
# Parse kline data
ohlcv_data = parse_kline_data(data)
# Store in database
await db_manager.insert_ohlcv_single(ohlcv_data)
# Calculate indicators if kline is closed
if data['k'].get('x'):
await self._calculate_and_store_indicators(
symbol.upper(), interval
)
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON from {stream_name}")
except asyncio.CancelledError:
self.logger.info(f"Kline stream cancelled: {stream_name}")
break
except Exception as e:
self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True)
except websockets.exceptions.ConnectionClosed as e:
self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}")
except asyncio.CancelledError:
self.logger.info(f"Kline WebSocket cancelled for {stream_name}")
break
except Exception as e:
self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True)
finally:
# Clean up
if stream_name in websocket_connections:
websocket_connections.pop(stream_name, None)
if self.websocket_collection_running:
self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...")
await asyncio.sleep(reconnect_delay)
async def _websocket_trade_stream(self, symbol: str):
"""WebSocket stream for trade/tick data"""
stream_name = f"{symbol}@trade"
uri = f"wss://stream.binance.com:9443/ws/{stream_name}"
reconnect_delay = config.get('collection', {}).get('websocket_reconnect_delay', 5)
ping_interval = int(os.getenv('WEBSOCKET_PING_INTERVAL', 20))
ping_timeout = int(os.getenv('WEBSOCKET_PING_TIMEOUT', 60))
while self.websocket_collection_running:
try:
async with websockets.connect(
uri,
ping_interval=ping_interval,
ping_timeout=ping_timeout
) as websocket:
self.logger.info(f"Connected to {stream_name}")
websocket_connections[stream_name] = websocket
tick_batch: List[Dict[str, Any]] = []
batch_size = config.get('collection', {}).get('tick_batch_size', 100)
async for message in websocket:
if not self.websocket_collection_running:
break
try:
data = json.loads(message)
if data.get('e') == 'trade':
# Parse trade data
tick_data = parse_trade_data(data)
tick_batch.append(tick_data)
# Batch insert when batch is full
if len(tick_batch) >= batch_size:
await db_manager.insert_ticks_batch(tick_batch)
tick_batch = []
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON from {stream_name}")
except asyncio.CancelledError:
self.logger.info(f"Trade stream cancelled: {stream_name}")
break
except Exception as e:
self.logger.error(f"Error processing {stream_name} message: {e}", exc_info=True)
# Insert remaining ticks
if tick_batch:
await db_manager.insert_ticks_batch(tick_batch)
except websockets.exceptions.ConnectionClosed as e:
self.logger.warning(f"WebSocket connection closed for {stream_name}: {e}")
except asyncio.CancelledError:
self.logger.info(f"Trade WebSocket cancelled for {stream_name}")
break
except Exception as e:
self.logger.error(f"WebSocket error for {stream_name}: {e}", exc_info=True)
finally:
# Clean up
if stream_name in websocket_connections:
websocket_connections.pop(stream_name, None)
if self.websocket_collection_running:
self.logger.info(f"Reconnecting to {stream_name} in {reconnect_delay}s...")
await asyncio.sleep(reconnect_delay)
async def stop_continuous_collection(self):
"""Stop continuous data collection"""
if not self.websocket_collection_running:
self.logger.warning("WebSocket collection not running")
return
self.logger.info("Stopping continuous data collection")
self.websocket_collection_running = False
self.is_collecting = False
# Cancel all running tasks
for task_name, task in list(running_tasks.items()):
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
self.logger.info(f"Cancelled task: {task_name}")
except Exception as e:
self.logger.error(f"Error cancelling task {task_name}: {e}")
# Close WebSocket connections
for conn_name, conn in list(websocket_connections.items()):
try:
await conn.close()
self.logger.info(f"Closed WebSocket: {conn_name}")
except Exception as e:
self.logger.error(f"Error closing WebSocket {conn_name}: {e}")
running_tasks.clear()
websocket_connections.clear()
self.logger.info("Continuous data collection stopped")
async def generate_candles_from_ticks(
self,
symbol: str,
interval: str,
start_time: datetime,
end_time: datetime
):
"""
Generate OHLCV candles from tick data
Args:
symbol: Trading pair symbol
interval: Candle interval (e.g., '1m', '5m', '1h')
start_time: Start time for candle generation
end_time: End time for candle generation
"""
self.logger.info(f"Generating {interval} candles from ticks for {symbol}")
# Get tick data from database
ticks = await db_manager.get_tick_data(symbol, start_time, end_time)
if not ticks:
self.logger.warning(f"No tick data found for {symbol}")
return
# Convert to DataFrame
df = pd.DataFrame(ticks)
df['time'] = pd.to_datetime(df['time'])
df.set_index('time', inplace=True)
# Resample to create OHLCV data
ohlcv = df['price'].resample(interval).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
})
volume = df['quantity'].resample(interval).sum()
trade_count = df.resample(interval).size()
# Combine data
candles: List[Dict[str, Any]] = []
for timestamp, row in ohlcv.iterrows():
if pd.notna(row['open']): # Skip empty periods
candle = {
'time': timestamp,
'symbol': symbol,
'exchange': 'binance',
'interval': interval,
'open_price': float(row['open']),
'high_price': float(row['high']),
'low_price': float(row['low']),
'close_price': float(row['close']),
'volume': float(volume.loc[timestamp]) if timestamp in volume.index else 0.0,
'quote_volume': None,
'trade_count': int(trade_count.loc[timestamp]) if timestamp in trade_count.index else 0
}
candles.append(candle)
# Store candles in database
if candles:
await db_manager.insert_ohlcv_batch(candles)
self.logger.info(f"Generated and stored {len(candles)} candles for {symbol} {interval}")
# Calculate technical indicators
await self._calculate_and_store_indicators(symbol, interval)
else:
self.logger.warning(f"No candles generated for {symbol} {interval}")
async def get_download_progress(self, symbol: str = None) -> Dict[str, Any]:
"""Get download progress for a symbol or all symbols"""
if symbol:
return self.download_progress.get(symbol, {'status': 'not_found'})
return self.download_progress
async def cleanup(self):
"""Clean up resources"""
await self.stop_continuous_collection()
# db_manager may have a close method; guard if absent
try:
if db_manager and hasattr(db_manager, "close"):
await db_manager.close()
except Exception as e:
self.logger.warning(f"Error closing database manager: {e}")
self.logger.info("BinanceDataCollector cleanup complete")
def start_ui_server():
"""Start the UI server as a subprocess"""
global ui_process
logger = logging.getLogger(__name__)
try:
# Get UI configuration from environment
host = os.getenv("WEB_HOST", "0.0.0.0")
port = os.getenv("WEB_PORT", "8000")
logger.info(f"Starting UI server on {host}:{port}")
# Start ui.py as a subprocess
ui_process = subprocess.Popen(
[sys.executable, "ui.py"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
logger.info(f"✓ UI server started with PID: {ui_process.pid}")
# Start a thread to log UI output
import threading
def parse_log_level(line: str) -> int:
"""Parse the log level from a log line"""
# Check for standard Python logging format
if ' - ERROR - ' in line or 'ERROR:' in line:
return logging.ERROR
elif ' - WARNING - ' in line or 'WARNING:' in line or ' - WARN - ' in line:
return logging.WARNING
elif ' - DEBUG - ' in line or 'DEBUG:' in line:
return logging.DEBUG
else:
# Default to INFO for all other lines (including INFO: and standard messages)
return logging.INFO
def log_ui_output():
"""Log UI stdout messages"""
if not ui_process.stdout:
return
for line in ui_process.stdout:
line = line.rstrip()
if line: # Only log non-empty lines
log_level = parse_log_level(line)
logger.log(log_level, f"[UI] {line}")
def log_ui_stderr():
"""Log UI stderr messages with intelligent level detection"""
if not ui_process.stderr:
return
for line in ui_process.stderr:
line = line.rstrip()
if line: # Only log non-empty lines
log_level = parse_log_level(line)
logger.log(log_level, f"[UI] {line}")
stdout_thread = threading.Thread(target=log_ui_output, daemon=True)
stderr_thread = threading.Thread(target=log_ui_stderr, daemon=True)
stdout_thread.start()
stderr_thread.start()
except Exception as e:
logger.error(f"✗ Failed to start UI server: {e}")
raise
def stop_ui_server():
"""Stop the UI server subprocess"""
global ui_process
logger = logging.getLogger(__name__)
if ui_process:
try:
logger.info("Stopping UI server...")
ui_process.terminate()
try:
ui_process.wait(timeout=10)
logger.info("✓ UI server stopped gracefully")
except subprocess.TimeoutExpired:
logger.warning("UI server didn't stop gracefully, forcing...")
ui_process.kill()
ui_process.wait()
logger.info("✓ UI server forcefully stopped")
except Exception as e:
logger.error(f"✗ Error stopping UI server: {e}")
finally:
ui_process = None
else:
logger.debug("UI server process not running")
# Global signal handlers
def signal_handler(signum, frame):
"""Handle shutdown signals"""
logger = logging.getLogger(__name__)
logger.info(f"Received signal {signum}, initiating shutdown...")
# Stop UI server
stop_ui_server()
# Cancel all running tasks
for task in asyncio.all_tasks():
task.cancel()
async def main():
"""Main application entry point"""
# Setup signal handlers
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
collector = BinanceDataCollector()
try:
# Initialize the collector
await collector.initialize()
# Start UI server
start_ui_server()
# Start continuous collection
await collector.start_continuous_collection()
# Keep the application running
while collector.websocket_collection_running:
await asyncio.sleep(1)
except KeyboardInterrupt:
logging.getLogger(__name__).info("Received keyboard interrupt")
except asyncio.CancelledError:
logging.getLogger(__name__).info("Application cancelled")
except Exception as e:
logging.getLogger(__name__).error(f"Application error: {e}", exc_info=True)
finally:
# Clean shutdown
logging.getLogger(__name__).info("Initiating shutdown...")
# Stop UI server first
stop_ui_server()
# Then cleanup collector
await collector.cleanup()
logging.getLogger(__name__).info("Application shutdown complete")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nShutdown requested by user")
except Exception as e:
print(f"Fatal error: {e}")
sys.exit(1)