#!/usr/bin/env python3 """ db.py - TimescaleDB Database Operations and Schema Management Database operations, connection pooling, and schema management for crypto trading data """ import asyncio import logging import os from datetime import datetime, timedelta, timezone, date, time as dt_time from typing import Dict, List, Optional, Any, Tuple, Iterable import asyncpg from contextlib import asynccontextmanager from dotenv import load_dotenv # Load environment variables load_dotenv('variables.env') def _utc_now() -> datetime: return datetime.now(timezone.utc) def _ensure_dt_aware_utc(dt: datetime) -> datetime: if isinstance(dt, dt_time): dt = datetime.combine(_utc_now().date(), dt) if dt.tzinfo is None: return dt.replace(tzinfo=timezone.utc) return dt.astimezone(timezone.utc) def _safe_upper(s: Optional[str]) -> Optional[str]: return s.upper() if isinstance(s, str) else s class DatabaseManager: """Manages TimescaleDB operations with connection pooling""" def __init__(self): self.pool: Optional[asyncpg.Pool] = None self.logger = logging.getLogger(__name__) self._connection_semaphore: Optional[asyncio.Semaphore] = None # Database connection parameters self.db_config = { 'host': os.getenv('DB_HOST', 'localhost'), 'port': int(os.getenv('DB_PORT', 5432)), 'database': os.getenv('DB_NAME', 'crypto_trading'), 'user': os.getenv('DB_USER', 'postgres'), 'password': os.getenv('DB_PASSWORD', 'password'), 'min_size': int(os.getenv('DB_POOL_MIN_SIZE', 10)), 'max_size': int(os.getenv('DB_POOL_MAX_SIZE', 50)), 'command_timeout': int(os.getenv('DB_COMMAND_TIMEOUT', 60)), } async def initialize(self): """Initialize database connection pool and create tables""" try: self.logger.info("Initializing database connection pool") self.pool = await asyncpg.create_pool( host=self.db_config['host'], port=self.db_config['port'], database=self.db_config['database'], user=self.db_config['user'], password=self.db_config['password'], min_size=self.db_config['min_size'], max_size=self.db_config['max_size'], command_timeout=self.db_config['command_timeout'], ) # Initialize semaphore to prevent connection exhaustion (20% headroom) max_concurrent = max(1, int(self.db_config['max_size'] * 0.8)) self._connection_semaphore = asyncio.Semaphore(max_concurrent) self.logger.info(f"Database connection pool created successfully (max concurrent: {max_concurrent})") # Create tables and hypertables await self.create_schema() self.logger.info("Database initialization complete") except Exception as e: self.logger.error(f"Database initialization failed: {e}", exc_info=True) raise @asynccontextmanager async def acquire_with_semaphore(self): """Acquire connection with semaphore to prevent pool exhaustion""" if self._connection_semaphore is None or self.pool is None: raise RuntimeError("DatabaseManager not initialized") async with self._connection_semaphore: async with self.pool.acquire() as conn: yield conn async def create_schema(self): """Create database schema with TimescaleDB hypertables""" if not self.pool: raise RuntimeError("Pool not initialized") async with self.pool.acquire() as conn: try: # Try to enable TimescaleDB extension try: await conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;") timescale_ok = True except Exception as e: self.logger.warning(f"TimescaleDB extension not available or failed to enable: {e}") timescale_ok = False # Create tables await conn.execute(""" CREATE TABLE IF NOT EXISTS crypto_ticks ( time TIMESTAMPTZ NOT NULL, symbol TEXT NOT NULL, exchange TEXT NOT NULL DEFAULT 'binance', price DECIMAL(20,8) NOT NULL, quantity DECIMAL(20,8) NOT NULL, trade_id BIGINT, is_buyer_maker BOOLEAN, PRIMARY KEY (time, symbol, trade_id) ); """) await conn.execute(""" CREATE TABLE IF NOT EXISTS crypto_ohlcv ( time TIMESTAMPTZ NOT NULL, symbol TEXT NOT NULL, exchange TEXT NOT NULL DEFAULT 'binance', interval TEXT NOT NULL, open_price DECIMAL(20,8) NOT NULL, high_price DECIMAL(20,8) NOT NULL, low_price DECIMAL(20,8) NOT NULL, close_price DECIMAL(20,8) NOT NULL, volume DECIMAL(20,8) NOT NULL, quote_volume DECIMAL(20,8), trade_count INTEGER, PRIMARY KEY (time, symbol, interval) ); """) await conn.execute(""" CREATE TABLE IF NOT EXISTS technical_indicators ( time TIMESTAMPTZ NOT NULL, symbol TEXT NOT NULL, exchange TEXT NOT NULL DEFAULT 'binance', interval TEXT NOT NULL, indicator_name TEXT NOT NULL, indicator_value DECIMAL(20,8), metadata JSONB, PRIMARY KEY (time, symbol, interval, indicator_name) ); """) # Create hypertables if timescale_ok: try: await conn.execute("SELECT create_hypertable('crypto_ticks', 'time', if_not_exists => TRUE);") except asyncpg.PostgresError as e: self.logger.debug(f"crypto_ticks hypertable setup note: {e}") try: await conn.execute("SELECT create_hypertable('crypto_ohlcv', 'time', if_not_exists => TRUE);") except asyncpg.PostgresError as e: self.logger.debug(f"crypto_ohlcv hypertable setup note: {e}") try: await conn.execute("SELECT create_hypertable('technical_indicators', 'time', if_not_exists => TRUE);") except asyncpg.PostgresError as e: self.logger.debug(f"technical_indicators hypertable setup note: {e}") # Create indexes for better query performance await self.create_indexes(conn) # Setup compression policies when possible if timescale_ok: await self.setup_compression_policies(conn) else: self.logger.info("Skipping compression policies because TimescaleDB extension is unavailable") self.logger.info("Database schema created successfully") except Exception as e: self.logger.error(f"Error creating database schema: {e}", exc_info=True) raise async def create_indexes(self, conn: asyncpg.Connection): """Create indexes for better query performance""" index_sqls = [ # Ticks indexes "CREATE INDEX IF NOT EXISTS idx_crypto_ticks_symbol_time ON crypto_ticks (symbol, time DESC);", "CREATE INDEX IF NOT EXISTS idx_crypto_ticks_time_symbol ON crypto_ticks (time DESC, symbol);", # OHLCV indexes "CREATE INDEX IF NOT EXISTS idx_crypto_ohlcv_symbol_interval_time ON crypto_ohlcv (symbol, interval, time DESC);", "CREATE INDEX IF NOT EXISTS idx_crypto_ohlcv_time_symbol ON crypto_ohlcv (time DESC, symbol);", # Indicators indexes "CREATE INDEX IF NOT EXISTS idx_technical_indicators_symbol_indicator_time ON technical_indicators (symbol, indicator_name, time DESC);", "CREATE INDEX IF NOT EXISTS idx_technical_indicators_time_symbol ON technical_indicators (time DESC, symbol);", ] for sql in index_sqls: try: await conn.execute(sql) except Exception as e: self.logger.warning(f"Index creation warning: {e}") async def setup_compression_policies(self, conn: asyncpg.Connection): """Setup TimescaleDB compression policies""" try: compression_alters = [ """ ALTER TABLE crypto_ticks SET ( timescaledb.compress, timescaledb.compress_segmentby = 'symbol,exchange', timescaledb.compress_orderby = 'time DESC' ); """, """ ALTER TABLE crypto_ohlcv SET ( timescaledb.compress, timescaledb.compress_segmentby = 'symbol,exchange,interval', timescaledb.compress_orderby = 'time DESC' ); """, """ ALTER TABLE technical_indicators SET ( timescaledb.compress, timescaledb.compress_segmentby = 'symbol,exchange,interval,indicator_name', timescaledb.compress_orderby = 'time DESC' ); """, ] for q in compression_alters: try: await conn.execute(q) except Exception as e: self.logger.warning(f"Compression setup warning: {e}") policies = [ "SELECT add_compression_policy('crypto_ticks', INTERVAL '7 days');", "SELECT add_compression_policy('crypto_ohlcv', INTERVAL '7 days');", "SELECT add_compression_policy('technical_indicators', INTERVAL '7 days');", ] for q in policies: try: await conn.execute(q) except Exception as e: self.logger.warning(f"Compression policy warning: {e}") except Exception as e: self.logger.warning(f"Compression setup failed: {e}") # -------------------------- # Insertion methods # -------------------------- async def insert_tick_single(self, tick_data: Dict[str, Any]): """Insert single tick record""" if not self.pool: raise RuntimeError("Pool not initialized") tick_time = _ensure_dt_aware_utc(tick_data['time']) symbol = _safe_upper(tick_data['symbol']) exchange = tick_data.get('exchange', 'binance') async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO crypto_ticks (time, symbol, exchange, price, quantity, trade_id, is_buyer_maker) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (time, symbol, trade_id) DO NOTHING; """, tick_time, symbol, exchange, tick_data['price'], tick_data['quantity'], tick_data['trade_id'], tick_data.get('is_buyer_maker', None), ) async def insert_ticks_batch(self, ticks_data: List[Dict[str, Any]]): """Insert multiple tick records in batch""" if not ticks_data or not self.pool: return records: List[Tuple[Any, ...]] = [] for t in ticks_data: tick_time = _ensure_dt_aware_utc(t['time']) symbol = _safe_upper(t['symbol']) exchange = t.get('exchange', 'binance') records.append(( tick_time, symbol, exchange, t['price'], t['quantity'], t['trade_id'], t.get('is_buyer_maker', None) )) async with self.pool.acquire() as conn: await conn.executemany( """ INSERT INTO crypto_ticks (time, symbol, exchange, price, quantity, trade_id, is_buyer_maker) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (time, symbol, trade_id) DO NOTHING; """, records ) self.logger.debug(f"Inserted {len(records)} tick records") async def insert_ohlcv_single(self, ohlcv_data: Dict[str, Any]): """Insert single OHLCV record""" if not self.pool: raise RuntimeError("Pool not initialized") ts = _ensure_dt_aware_utc(ohlcv_data['time']) symbol = _safe_upper(ohlcv_data['symbol']) exchange = ohlcv_data.get('exchange', 'binance') interval = ohlcv_data['interval'] async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO crypto_ohlcv (time, symbol, exchange, interval, open_price, high_price, low_price, close_price, volume, quote_volume, trade_count) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (time, symbol, interval) DO UPDATE SET open_price = EXCLUDED.open_price, high_price = EXCLUDED.high_price, low_price = EXCLUDED.low_price, close_price = EXCLUDED.close_price, volume = EXCLUDED.volume, quote_volume = EXCLUDED.quote_volume, trade_count = EXCLUDED.trade_count; """, ts, symbol, exchange, interval, ohlcv_data['open_price'], ohlcv_data['high_price'], ohlcv_data['low_price'], ohlcv_data['close_price'], ohlcv_data['volume'], ohlcv_data.get('quote_volume'), ohlcv_data.get('trade_count'), ) async def insert_ohlcv_batch(self, ohlcv_data: List[Dict[str, Any]]): """Insert multiple OHLCV records in batch""" if not ohlcv_data or not self.pool: return records: List[Tuple[Any, ...]] = [] for c in ohlcv_data: ts = _ensure_dt_aware_utc(c['time']) symbol = _safe_upper(c['symbol']) exchange = c.get('exchange', 'binance') interval = c['interval'] records.append(( ts, symbol, exchange, interval, c['open_price'], c['high_price'], c['low_price'], c['close_price'], c['volume'], c.get('quote_volume'), c.get('trade_count') )) async with self.pool.acquire() as conn: await conn.executemany( """ INSERT INTO crypto_ohlcv (time, symbol, exchange, interval, open_price, high_price, low_price, close_price, volume, quote_volume, trade_count) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11) ON CONFLICT (time, symbol, interval) DO UPDATE SET open_price = EXCLUDED.open_price, high_price = EXCLUDED.high_price, low_price = EXCLUDED.low_price, close_price = EXCLUDED.close_price, volume = EXCLUDED.volume, quote_volume = EXCLUDED.quote_volume, trade_count = EXCLUDED.trade_count; """, records ) self.logger.debug(f"Inserted {len(records)} OHLCV records") async def insert_indicators_batch(self, symbol: str, interval: str, indicators_data: List[Dict[str, Any]]): """Insert technical indicators batch""" if not indicators_data or not self.pool: return symbol_u = _safe_upper(symbol) records: List[Tuple[Any, ...]] = [] for ind in indicators_data: ts = _ensure_dt_aware_utc(ind['time']) records.append(( ts, symbol_u, 'binance', interval, ind['indicator_name'], ind.get('indicator_value'), ind.get('metadata'), )) async with self.pool.acquire() as conn: await conn.executemany( """ INSERT INTO technical_indicators (time, symbol, exchange, interval, indicator_name, indicator_value, metadata) VALUES ($1,$2,$3,$4,$5,$6,$7) ON CONFLICT (time, symbol, interval, indicator_name) DO UPDATE SET indicator_value = EXCLUDED.indicator_value, metadata = EXCLUDED.metadata; """, records ) self.logger.debug(f"Inserted {len(records)} indicator records for {symbol_u} {interval}") # -------------------------- # Retrieval methods # -------------------------- async def get_tick_data(self, symbol: str, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: """Get tick data for a symbol within time range""" if not self.pool: return [] symbol_u = _safe_upper(symbol) start_t = _ensure_dt_aware_utc(start_time) end_t = _ensure_dt_aware_utc(end_time) async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT time, symbol, exchange, price, quantity, trade_id, is_buyer_maker FROM crypto_ticks WHERE symbol = $1 AND time >= $2 AND time <= $3 ORDER BY time ASC; """, symbol_u, start_t, end_t ) return [dict(row) for row in rows] async def get_ohlcv_data(self, symbol: str, interval: str, limit: int = 1000) -> List[Dict[str, Any]]: """Get OHLCV data for a symbol and interval (returns newest first)""" if not self.pool: return [] symbol_u = _safe_upper(symbol) async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT time, symbol, exchange, interval, open_price, high_price, low_price, close_price, volume, quote_volume, trade_count FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 ORDER BY time DESC LIMIT $3; """, symbol_u, interval, limit ) return [dict(row) for row in rows] async def get_recent_candles(self, symbol: str, interval: str, limit: int = 1000) -> List[Dict[str, Any]]: """ Get recent candles for a symbol and interval (alias for chart display) Returns data in ASCENDING order with JS-friendly field names """ if not self.pool: return [] symbol_u = _safe_upper(symbol) async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT time, symbol, exchange, interval, open_price, high_price, low_price, close_price, volume, quote_volume, trade_count FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 ORDER BY time ASC LIMIT $3; """, symbol_u, interval, limit ) result: List[Dict[str, Any]] = [] for row in rows: t: Optional[datetime] = row['time'] result.append({ 'timestamp': t.isoformat() if t else None, 'symbol': row['symbol'], 'exchange': row['exchange'], 'interval': row['interval'], 'open': float(row['open_price']) if row['open_price'] is not None else None, 'high': float(row['high_price']) if row['high_price'] is not None else None, 'low': float(row['low_price']) if row['low_price'] is not None else None, 'close': float(row['close_price']) if row['close_price'] is not None else None, 'volume': float(row['volume']) if row['volume'] is not None else None, 'quote_volume': float(row['quote_volume']) if row['quote_volume'] is not None else None, 'trade_count': int(row['trade_count']) if row['trade_count'] is not None else None, }) self.logger.info(f"Retrieved {len(result)} candles for {symbol_u} {interval}") return result async def get_latest_ohlcv(self, symbol: str, interval: str = '1d') -> Optional[Dict[str, Any]]: """Get latest OHLCV record for a symbol""" if not self.pool: return None symbol_u = _safe_upper(symbol) async with self.pool.acquire() as conn: row = await conn.fetchrow( """ SELECT time, symbol, exchange, interval, open_price, high_price, low_price, close_price, volume, quote_volume, trade_count FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 ORDER BY time DESC LIMIT 1; """, symbol_u, interval ) return dict(row) if row else None async def get_indicators_data(self, symbol: str, interval: str, indicator_name: str, limit: int = 1000) -> List[Dict[str, Any]]: """Get technical indicator data""" if not self.pool: return [] symbol_u = _safe_upper(symbol) async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT time, symbol, exchange, interval, indicator_name, indicator_value, metadata FROM technical_indicators WHERE symbol = $1 AND interval = $2 AND indicator_name = $3 ORDER BY time DESC LIMIT $4; """, symbol_u, interval, indicator_name, limit ) return [dict(row) for row in rows] async def get_available_symbols(self) -> List[str]: """Get list of all available symbols in the database""" if not self.pool: return [] async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT DISTINCT symbol FROM crypto_ohlcv ORDER BY symbol ASC; """ ) return [row['symbol'] for row in rows] async def get_current_price_and_trends(self, symbol: str) -> Optional[Dict[str, Any]]: """ Get current price and percentage changes for multiple timeframes Returns price trends for 15m, 1h, 1d, 1w """ if not self.pool: return None symbol_u = _safe_upper(symbol) now = _utc_now() time_15m_ago = now - timedelta(minutes=15) time_1h_ago = now - timedelta(hours=1) time_1d_ago = now - timedelta(days=1) time_1w_ago = now - timedelta(weeks=1) async with self.pool.acquire() as conn: try: current = await conn.fetchrow( """ SELECT close_price, time FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1h' ORDER BY time DESC LIMIT 1; """, symbol_u ) if not current: return None current_price = float(current['close_price']) current_time = current['time'] price_15m = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1m' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_15m_ago ) price_1h = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1h' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_1h_ago ) price_1d = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1h' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_1d_ago ) price_1w = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1d' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_1w_ago ) def calc_change(old): if old and float(old) > 0: return round(((current_price - float(old)) / float(old)) * 100, 2) return 0.0 return { 'symbol': symbol_u, 'current_price': current_price, 'last_update': current_time.isoformat(), 'trends': { '15m': calc_change(price_15m['close_price'] if price_15m else None), '1h': calc_change(price_1h['close_price'] if price_1h else None), '1d': calc_change(price_1d['close_price'] if price_1d else None), '1w': calc_change(price_1w['close_price'] if price_1w else None), }, } except Exception as e: self.logger.error(f"Error getting price trends for {symbol_u}: {e}", exc_info=True) return None # -------------------------- # Gap and coverage utilities # -------------------------- def _interval_to_seconds(self, interval: str) -> int: """Convert interval string to seconds""" try: if interval.endswith('m'): return int(interval[:-1]) * 60 if interval.endswith('h'): return int(interval[:-1]) * 3600 if interval.endswith('d'): return int(interval[:-1]) * 86400 if interval.endswith('w'): return int(interval[:-1]) * 604800 except Exception: pass # Default to 1 minute return 60 def _calculate_expected_records(self, start: datetime, end: datetime, interval: str) -> int: """Calculate expected number of records for a time period""" start_u = _ensure_dt_aware_utc(start) end_u = _ensure_dt_aware_utc(end) if end_u < start_u: return 0 total_seconds = (end_u - start_u).total_seconds() interval_seconds = max(1, self._interval_to_seconds(interval)) return int(total_seconds // interval_seconds) + 1 async def get_data_coverage_summary(self, symbol: str, interval: str) -> Dict[str, Any]: """Get data coverage summary for a symbol and interval""" if not self.pool: return { 'symbol': symbol, 'interval': interval, 'first_record': None, 'last_record': None, 'total_records': 0, 'expected_records': 0, 'missing_records': 0, 'coverage_percent': 0.0, } symbol_u = _safe_upper(symbol) async with self.pool.acquire() as conn: result = await conn.fetchrow( """ SELECT MIN(time) as first_record, MAX(time) as last_record, COUNT(*) as total_records FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2; """, symbol_u, interval ) if not result or not result['first_record']: return { 'symbol': symbol_u, 'interval': interval, 'first_record': None, 'last_record': None, 'total_records': 0, 'expected_records': 0, 'missing_records': 0, 'coverage_percent': 0.0, } first_record: datetime = result['first_record'] last_record: datetime = result['last_record'] total_records: int = result['total_records'] expected_records = self._calculate_expected_records(first_record, last_record, interval) missing_records = max(0, expected_records - total_records) coverage_percent = (total_records / expected_records * 100) if expected_records > 0 else 0.0 return { 'symbol': symbol_u, 'interval': interval, 'first_record': first_record.isoformat(), 'last_record': last_record.isoformat(), 'total_records': total_records, 'expected_records': expected_records, 'missing_records': missing_records, 'coverage_percent': round(coverage_percent, 2), } async def get_existing_time_ranges(self, symbol: str, interval: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """ Get existing time ranges for a symbol and interval within a date range. Returns list of existing data blocks to help identify gaps. """ if not self.pool: return [] symbol_u = _safe_upper(symbol) start_u = _ensure_dt_aware_utc(start_date) end_u = _ensure_dt_aware_utc(end_date) interval_seconds = self._interval_to_seconds(interval) async with self.pool.acquire() as conn: rows = await conn.fetch( """ WITH ordered AS ( SELECT time, LAG(time) OVER (ORDER BY time) AS prev_time FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 AND time >= $3 AND time <= $4 ORDER BY time ), markers AS ( SELECT time, CASE WHEN prev_time IS NULL OR EXTRACT(EPOCH FROM (time - prev_time)) > $5 THEN 1 ELSE 0 END AS is_new_block FROM ordered ), blocks AS ( SELECT time, SUM(is_new_block) OVER (ORDER BY time) AS block_id FROM markers ) SELECT MIN(time) AS block_start, MAX(time) AS block_end FROM blocks GROUP BY block_id ORDER BY block_start; """, symbol_u, interval, start_u, end_u, interval_seconds + 1 ) ranges: List[Dict[str, Any]] = [] for row in rows: ranges.append({'start': row['block_start'], 'end': row['block_end']}) return ranges async def get_missing_time_ranges(self, symbol: str, interval: str, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]: """Calculate missing time ranges that need to be downloaded.""" start_u = _ensure_dt_aware_utc(start_date) end_u = _ensure_dt_aware_utc(end_date) if end_u <= start_u: return [] existing_blocks = await self.get_existing_time_ranges(symbol, interval, start_u, end_u) if not existing_blocks: return [{'start': start_u, 'end': end_u}] missing: List[Dict[str, Any]] = [] current = start_u for block in existing_blocks: block_start: datetime = _ensure_dt_aware_utc(block['start']) block_end: datetime = _ensure_dt_aware_utc(block['end']) if current < block_start: missing.append({'start': current, 'end': block_start}) current = max(current, block_end + timedelta(microseconds=1)) if current < end_u: missing.append({'start': current, 'end': end_u}) return missing async def check_data_exists_for_range(self, symbol: str, interval: str, start_date: datetime, end_date: datetime) -> Dict[str, Any]: """ Quick check if data exists for a specific range. Returns: Dict with 'exists', 'count', 'expected_count', 'coverage_percent', 'is_complete' """ if not self.pool: return {'exists': False, 'count': 0, 'expected_count': 0, 'coverage_percent': 0.0, 'is_complete': False} symbol_u = _safe_upper(symbol) start_u = _ensure_dt_aware_utc(start_date) end_u = _ensure_dt_aware_utc(end_date) async with self.pool.acquire() as conn: result = await conn.fetchrow( """ SELECT COUNT(*) as count FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 AND time >= $3 AND time <= $4; """, symbol_u, interval, start_u, end_u ) count = int(result['count']) if result else 0 expected_count = self._calculate_expected_records(start_u, end_u, interval) coverage_percent = (count / expected_count * 100) if expected_count > 0 else 0.0 return { 'exists': count > 0, 'count': count, 'expected_count': expected_count, 'coverage_percent': round(coverage_percent, 2), 'is_complete': coverage_percent >= 99.0, } async def find_data_gaps(self, symbol: str, interval: str, min_gap_size: int = 2) -> List[Dict[str, Any]]: """Find gaps in data (missing consecutive candles)""" if not self.pool: return [] symbol_u = _safe_upper(symbol) interval_seconds = self._interval_to_seconds(interval) threshold = interval_seconds * max(1, min_gap_size) async with self.pool.acquire() as conn: rows = await conn.fetch( """ WITH s AS ( SELECT time, LAG(time) OVER (ORDER BY time) AS prev_time FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 ORDER BY time ) SELECT prev_time AS gap_start, time AS gap_end, EXTRACT(EPOCH FROM (time - prev_time)) / $3 AS missing_candles FROM s WHERE prev_time IS NOT NULL AND EXTRACT(EPOCH FROM (time - prev_time)) > $4 ORDER BY prev_time; """, symbol_u, interval, interval_seconds, threshold ) gaps: List[Dict[str, Any]] = [] for row in rows: start_dt: datetime = row['gap_start'] end_dt: datetime = row['gap_end'] missing = max(0, int(row['missing_candles']) - 1) gaps.append({ 'gap_start': start_dt.isoformat(), 'gap_end': end_dt.isoformat(), 'missing_candles': missing, 'duration_hours': round((end_dt - start_dt).total_seconds() / 3600, 2), }) return gaps async def detect_gaps(self, symbol: str, interval: str) -> Dict[str, Any]: """Detect data gaps for a symbol and interval""" coverage = await self.get_data_coverage_summary(symbol, interval) gaps = await self.find_data_gaps(symbol, interval, min_gap_size=2) return {'coverage': coverage, 'gaps': gaps} async def get_data_coverage_by_day(self, symbol: str, interval: str, start_date: datetime, end_date: datetime) -> Dict[str, Any]: """Get daily coverage statistics for a symbol/interval""" if not self.pool: return {'daily_coverage': []} symbol_u = _safe_upper(symbol) start_u = _ensure_dt_aware_utc(start_date) end_u = _ensure_dt_aware_utc(end_date) try: async with self.pool.acquire() as conn: interval_seconds = self._interval_to_seconds(interval) records_per_day = max(1, 86400 // max(1, interval_seconds)) rows = await conn.fetch( """ SELECT DATE(time) as date, COUNT(*) as actual_records, $3 as expected_records, ROUND((COUNT(*)::decimal / $3) * 100, 2) as coverage_percent FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 AND time >= $4 AND time <= $5 GROUP BY DATE(time) ORDER BY DATE(time) ASC; """, symbol_u, interval, records_per_day, start_u, end_u ) daily: List[Dict[str, Any]] = [] for row in rows: coverage_pct = float(row['coverage_percent']) status = 'complete' if coverage_pct >= 95 else ('partial' if coverage_pct >= 50 else 'empty') # row['date'] is a date; serialize to ISO date d: date = row['date'] daily.append({ 'date': d.isoformat(), 'actual_records': int(row['actual_records']), 'expected_records': int(row['expected_records']), 'coverage_percent': coverage_pct, 'status': status, }) return { 'symbol': symbol_u, 'interval': interval, 'start_date': start_u.isoformat(), 'end_date': end_u.isoformat(), 'daily_coverage': daily, } except Exception as e: self.logger.error(f"Error getting daily coverage: {e}", exc_info=True) return {'daily_coverage': []} async def get_all_pairs_gap_status(self) -> List[Dict[str, Any]]: """ Get gap status for all trading pairs across all intervals Returns comprehensive status for the monitoring UI """ from utils import load_config # local import to avoid circular imports config = load_config() intervals = config.get('collection', {}).get('candle_intervals', ['1m', '5m', '15m', '1h', '4h', '1d']) results: List[Dict[str, Any]] = [] for pair in config.get('trading_pairs', []): symbol = pair['symbol'] record_from_date = pair.get('record_from_date') or \ config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') for interval in intervals: try: gap_info = await self.detect_gaps(symbol, interval) coverage = gap_info.get('coverage', {}) gaps = gap_info.get('gaps', []) if len(gaps) == 0: status = 'complete' elif any(g['missing_candles'] > 100 for g in gaps): status = 'filling' elif coverage.get('total_records', 0) == 0: status = 'empty' else: status = 'has_gaps' fillable_gaps = [g for g in gaps if g['missing_candles'] <= 100] results.append({ 'symbol': symbol, 'interval': interval, 'status': status, 'total_gaps': len(gaps), 'fillable_gaps': len(fillable_gaps), 'coverage_percent': coverage.get('coverage_percent', 0), 'start_date': record_from_date, 'first_record': coverage.get('first_record'), 'last_record': coverage.get('last_record'), 'total_records': coverage.get('total_records', 0), 'expected_records': coverage.get('expected_records', 0), 'missing_records': coverage.get('missing_records', 0), 'gaps': gaps, }) except Exception as e: self.logger.error(f"Error getting status for {symbol} {interval}: {e}") continue return results async def fill_gaps_intelligently(self, symbol: str, interval: str, max_attempts: int = 3) -> Dict[str, Any]: """ Intelligently fill gaps with retry logic and averaging for unfillable gaps """ from binance.client import Client # local import self.logger.info(f"Starting intelligent gap fill for {symbol} {interval}") gap_info = await self.detect_gaps(symbol, interval) gaps = gap_info.get('gaps', []) if not gaps: return {'status': 'success', 'message': 'No gaps found', 'gaps_filled': 0, 'averaged_candles': 0} api_key = os.getenv('BINANCE_API_KEY') secret_key = os.getenv('BINANCE_SECRET_KEY') client = Client(api_key, secret_key) if api_key and secret_key else Client() gaps_filled = 0 averaged_candles = 0 failed_gaps: List[Dict[str, Any]] = [] for gap in gaps: gap_start = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_start'])) gap_end = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_end'])) missing_candles = gap['missing_candles'] if missing_candles > 1000: self.logger.info(f"Skipping large gap: {missing_candles} candles") continue filled = False for attempt in range(max_attempts): try: self.logger.info(f"Attempt {attempt + 1}/{max_attempts} to fill gap: {gap_start} to {gap_end}") klines = client.get_historical_klines( symbol=symbol, interval=interval, start_str=int(gap_start.timestamp() * 1000), end_str=int(gap_end.timestamp() * 1000), limit=1000 ) if klines and len(klines) > 0: # Normalize to parse_kline_data shape from utils import parse_kline_data ohlcv_batch: List[Dict[str, Any]] = [] for k in klines: try: ws_like = { 'e': 'kline', 'E': int(k[6]), 's': _safe_upper(symbol), 'k': { 't': int(k[0]), 'T': int(k[6]), 's': _safe_upper(symbol), 'i': interval, 'o': str(k[1]), 'h': str(k[2]), 'l': str(k[3]), 'c': str(k[4]), 'v': str(k[5]), 'q': str(k[7]), 'n': int(k[8]), 'x': True, }, } parsed = parse_kline_data(ws_like) ohlcv_batch.append(parsed) except Exception as pe: self.logger.error(f"Error parsing kline: {pe}") continue if ohlcv_batch: await self.insert_ohlcv_batch(ohlcv_batch) gaps_filled += 1 filled = True self.logger.info(f"Successfully filled gap with {len(ohlcv_batch)} records") break else: self.logger.warning(f"No parsed data produced for gap {gap_start} to {gap_end}") else: self.logger.warning(f"No data returned from Binance for gap {gap_start} to {gap_end}") except Exception as e: self.logger.error(f"Attempt {attempt + 1} failed: {e}") if attempt < max_attempts - 1: await asyncio.sleep(2 ** attempt) if not filled and missing_candles <= 5: try: self.logger.info(f"Using intelligent averaging for small gap: {missing_candles} candles") averaged = await self._fill_gap_with_averaging(symbol, interval, gap_start, gap_end, missing_candles) averaged_candles += averaged if averaged > 0: gaps_filled += 1 except Exception as e: self.logger.error(f"Error averaging gap: {e}") failed_gaps.append(gap) elif not filled: failed_gaps.append(gap) return { 'status': 'success', 'message': f'Filled {gaps_filled} gaps ({averaged_candles} via averaging)', 'gaps_filled': gaps_filled, 'averaged_candles': averaged_candles, 'failed_gaps': len(failed_gaps), 'total_gaps': len(gaps), } async def get_prioritized_gaps(self, symbol: str, interval: str) -> List[Dict[str, Any]]: """ Get gaps prioritized by importance (recent gaps first, then by size) This helps fill the most critical gaps first """ gaps = (await self.detect_gaps(symbol, interval)).get('gaps', []) if not gaps: return [] now = _utc_now() for g in gaps: gap_end = datetime.fromisoformat(g['gap_end']) gap_end = _ensure_dt_aware_utc(gap_end) days_old = (now - gap_end).days recency_score = max(0, 365 - days_old) / 365 * 100.0 size_score = min(100.0, 100.0 / max(1, g['missing_candles'])) fillable_bonus = 50.0 if g['missing_candles'] <= 100 else 0.0 g['priority_score'] = recency_score + size_score + fillable_bonus g['days_old'] = days_old gaps.sort(key=lambda x: x['priority_score'], reverse=True) return gaps async def _fill_gap_with_averaging(self, symbol: str, interval: str, gap_start: datetime, gap_end: datetime, missing_candles: int) -> int: """Fill a gap using intelligent averaging based on surrounding candles""" if not self.pool: return 0 symbol_u = _safe_upper(symbol) start_u = _ensure_dt_aware_utc(gap_start) end_u = _ensure_dt_aware_utc(gap_end) async with self.pool.acquire() as conn: try: lookback = 10 before = await conn.fetch( """ SELECT open_price, high_price, low_price, close_price, volume FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 AND time < $3 ORDER BY time DESC LIMIT $4; """, symbol_u, interval, start_u, lookback ) after = await conn.fetch( """ SELECT open_price, high_price, low_price, close_price, volume FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 AND time > $3 ORDER BY time ASC LIMIT $4; """, symbol_u, interval, end_u, lookback ) if not before or not after: self.logger.warning("Not enough surrounding data for averaging") return 0 avg_open = sum(float(c['open_price']) for c in before) / len(before) avg_high = sum(float(c['high_price']) for c in before) / len(before) avg_low = sum(float(c['low_price']) for c in before) / len(before) avg_close = sum(float(c['close_price']) for c in before) / len(before) avg_volume = sum(float(c['volume']) for c in before) / len(before) interval_seconds = self._interval_to_seconds(interval) current_time = start_u gen: List[Dict[str, Any]] = [] import random while current_time < end_u: variation = random.uniform(0.999, 1.001) gen.append({ "time": current_time, "symbol": symbol_u, "exchange": "binance", "interval": interval, "open_price": avg_open * variation, "high_price": avg_high * variation * 1.001, "low_price": avg_low * variation * 0.999, "close_price": avg_close * variation, "volume": avg_volume * variation, "quote_volume": None, "trade_count": None, }) current_time = current_time + timedelta(seconds=interval_seconds) if current_time >= end_u: break if gen: await self.insert_ohlcv_batch(gen) self.logger.info(f"Inserted {len(gen)} averaged candles") return len(gen) return 0 except Exception as e: self.logger.error(f"Error in averaging: {e}", exc_info=True) return 0 async def check_data_health(self, symbol: str, interval: str) -> Dict[str, Any]: """ Comprehensive health check for a trading pair's data Detects various data quality issues beyond just gaps """ coverage = await self.get_data_coverage_summary(symbol, interval) gaps = await self.find_data_gaps(symbol, interval) health_issues: List[Dict[str, Any]] = [] # 1. Stale data last_record_iso = coverage.get('last_record') if last_record_iso: last_record = _ensure_dt_aware_utc(datetime.fromisoformat(last_record_iso)) hours_since = (_utc_now() - last_record).total_seconds() / 3600 if hours_since > 24: health_issues.append({'severity': 'high', 'issue': 'stale_data', 'message': f'No data in last {hours_since:.1f} hours'}) # 2. Excessive gaps if len(gaps) > 10: health_issues.append({'severity': 'medium', 'issue': 'fragmented_data', 'message': f'{len(gaps)} gaps detected - data is fragmented'}) # 3. Low coverage coverage_pct = coverage.get('coverage_percent', 0.0) or 0.0 if coverage_pct < 50: health_issues.append({'severity': 'high', 'issue': 'low_coverage', 'message': f'Only {coverage_pct:.1f}% data coverage'}) elif coverage_pct < 80: health_issues.append({'severity': 'medium', 'issue': 'medium_coverage', 'message': f'{coverage_pct:.1f}% data coverage - could be improved'}) # 4. Zero-volume candles zero_vol_count = 0 if self.pool: async with self.pool.acquire() as conn: zero_vol_count = await conn.fetchval( """ SELECT COUNT(*) FROM crypto_ohlcv WHERE symbol = $1 AND interval = $2 AND volume = 0; """, _safe_upper(symbol), interval ) if zero_vol_count and zero_vol_count > 0: health_issues.append({'severity': 'low', 'issue': 'zero_volume_candles', 'message': f'{zero_vol_count} candles with zero volume detected'}) # Score health_score = 100 for issue in health_issues: if issue['severity'] == 'high': health_score -= 30 elif issue['severity'] == 'medium': health_score -= 15 elif issue['severity'] == 'low': health_score -= 5 health_score = max(0, health_score) if health_score >= 90: status = 'excellent' elif health_score >= 70: status = 'good' elif health_score >= 50: status = 'fair' else: status = 'poor' return { 'symbol': _safe_upper(symbol), 'interval': interval, 'health_score': health_score, 'status': status, 'issues': health_issues, 'coverage_percent': coverage_pct, 'total_gaps': len(gaps), 'last_check': _utc_now().isoformat(), } async def get_detailed_statistics(self) -> Dict[str, Any]: """Get detailed database statistics for all symbols""" if not self.pool: return {'overall': {}, 'symbols': []} async with self.pool.acquire() as conn: overall = await conn.fetchrow( """ SELECT COUNT(DISTINCT symbol) as total_symbols, COUNT(*) as total_candles, MIN(time) as first_record, MAX(time) as last_record FROM crypto_ohlcv; """ ) symbols = await conn.fetch( """ SELECT symbol, COUNT(DISTINCT interval) as intervals_count, COUNT(*) as total_candles, MIN(time) as first_record, MAX(time) as last_record FROM crypto_ohlcv GROUP BY symbol ORDER BY symbol; """ ) return {'overall': dict(overall) if overall else {}, 'symbols': [dict(r) for r in symbols]} async def get_gap_fill_progress(self, symbol: str, interval: str) -> Dict[str, Any]: """ Get real-time progress of gap filling operations """ current = await self.get_data_coverage_summary(symbol, interval) missing = current.get('missing_records', 0) coverage = current.get('coverage_percent', 0.0) avg_fill_rate = 100.0 # candles per minute (tunable) estimated_minutes = (missing / avg_fill_rate) if avg_fill_rate > 0 else 0.0 return { 'symbol': _safe_upper(symbol), 'interval': interval, 'current_coverage': coverage, 'missing_records': missing, 'estimated_time_minutes': round(estimated_minutes, 1), 'estimated_time_human': self._format_duration(estimated_minutes), 'status': 'complete' if coverage >= 99.9 else 'in_progress', } def _format_duration(self, minutes: float) -> str: """Convert minutes to human-readable format""" if minutes < 1: return f"{int(minutes * 60)}s" if minutes < 60: return f"{int(minutes)}m" if minutes < 1440: hours = minutes / 60.0 return f"{hours:.1f}h" days = minutes / 1440.0 return f"{days:.1f}d" # -------------------------- # Stats and utilities # -------------------------- async def get_total_records(self) -> int: """Get total number of records across all tables""" if not self.pool: return 0 async with self.pool.acquire() as conn: result = await conn.fetchrow( """ SELECT (SELECT COUNT(*) FROM crypto_ticks) + (SELECT COUNT(*) FROM crypto_ohlcv) + (SELECT COUNT(*) FROM technical_indicators) as total; """ ) return int(result['total']) if result else 0 async def get_last_update_time(self) -> str: """Get the last update time across all tables""" if not self.pool: return "Never" async with self.pool.acquire() as conn: result = await conn.fetchrow( """ SELECT MAX(last_update) as last_update FROM ( SELECT MAX(time) as last_update FROM crypto_ticks UNION ALL SELECT MAX(time) as last_update FROM crypto_ohlcv UNION ALL SELECT MAX(time) as last_update FROM technical_indicators ) sub; """ ) if result and result['last_update']: return result['last_update'].isoformat() return "Never" async def get_symbol_statistics(self, symbol: str) -> Dict[str, Any]: """Get statistics for a specific symbol""" if not self.pool: return {} symbol_u = _safe_upper(symbol) async with self.pool.acquire() as conn: stats = await conn.fetchrow( """ SELECT (SELECT COUNT(*) FROM crypto_ticks WHERE symbol = $1) as tick_count, (SELECT COUNT(*) FROM crypto_ohlcv WHERE symbol = $1) as ohlcv_count, (SELECT COUNT(*) FROM technical_indicators WHERE symbol = $1) as indicator_count, (SELECT MIN(time) FROM crypto_ohlcv WHERE symbol = $1) as first_record, (SELECT MAX(time) FROM crypto_ohlcv WHERE symbol = $1) as last_record; """, symbol_u ) return dict(stats) if stats else {} async def get_all_symbols_summary(self) -> List[Dict[str, Any]]: """Get summary statistics for all symbols""" if not self.pool: return [] async with self.pool.acquire() as conn: rows = await conn.fetch( """ SELECT symbol, COUNT(DISTINCT interval) as intervals_count, MIN(time) as first_record, MAX(time) as last_record, COUNT(*) as total_candles FROM crypto_ohlcv GROUP BY symbol ORDER BY symbol; """ ) return [dict(row) for row in rows] async def get_all_gaps_summary(self) -> List[Dict[str, Any]]: """Get gap summary for all symbols and intervals""" symbols = await self.get_available_symbols() intervals = ['1m', '5m', '15m', '1h', '4h', '1d'] summary: List[Dict[str, Any]] = [] for symbol in symbols: for interval in intervals: try: gap_info = await self.detect_gaps(symbol, interval) coverage = gap_info['coverage'] if coverage.get('total_records', 0) > 0: summary.append({ 'symbol': symbol, 'interval': interval, 'total_gaps': len(gap_info['gaps']), 'fillable_gaps': len([g for g in gap_info['gaps'] if g['missing_candles'] < 1000]), 'total_missing_candles': sum(g['missing_candles'] for g in gap_info['gaps']), 'coverage_percent': coverage['coverage_percent'], }) except Exception as e: self.logger.warning(f"Error getting gaps for {symbol} {interval}: {e}") return summary async def get_current_price_and_trends_with_volume(self, symbol: str) -> Optional[Dict[str, Any]]: """ Get current price, percentage changes, and robust 15m volume anomaly status. Baseline blends last 4h, 24h, and 7d per-minute stats (excluding the most recent 15m). """ if not self.pool: return None symbol_u = _safe_upper(symbol) now = _utc_now() time_15m_ago = now - timedelta(minutes=15) time_1h_ago = now - timedelta(hours=1) time_1d_ago = now - timedelta(days=1) time_1w_ago = now - timedelta(weeks=1) baseline_end = time_15m_ago start_4h = baseline_end - timedelta(hours=4) start_24h = baseline_end - timedelta(hours=24) start_7d = baseline_end - timedelta(days=7) async with self.pool.acquire() as conn: try: current = await conn.fetchrow( """ SELECT close_price, volume, time FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1m' ORDER BY time DESC LIMIT 1; """, symbol_u ) if not current: return None current_price = float(current['close_price']) current_time = current['time'] # Price anchors price_15m = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1m' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_15m_ago ) price_1h = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1h' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_1h_ago ) price_1d = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1h' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_1d_ago ) price_1w = await conn.fetchrow( """ SELECT close_price FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1d' AND time <= $2 ORDER BY time DESC LIMIT 1; """, symbol_u, time_1w_ago ) # Baselines and last 15m in one pass stats = await conn.fetchrow( """ SELECT COALESCE(SUM(volume) FILTER (WHERE time > $2 AND time <= $3), 0) AS vol_15m, AVG(volume) FILTER (WHERE time > $4 AND time <= $2) AS avg_4h, STDDEV_SAMP(volume) FILTER (WHERE time > $4 AND time <= $2) AS std_4h, COUNT(*) FILTER (WHERE time > $4 AND time <= $2) AS n_4h, AVG(volume) FILTER (WHERE time > $5 AND time <= $2) AS avg_24h, STDDEV_SAMP(volume) FILTER (WHERE time > $5 AND time <= $2) AS std_24h, COUNT(*) FILTER (WHERE time > $5 AND time <= $2) AS n_24h, AVG(volume) FILTER (WHERE time > $6 AND time <= $2) AS avg_7d, STDDEV_SAMP(volume) FILTER (WHERE time > $6 AND time <= $2) AS std_7d, COUNT(*) FILTER (WHERE time > $6 AND time <= $2) AS n_7d FROM crypto_ohlcv WHERE symbol = $1 AND interval = '1m'; """, symbol_u, time_15m_ago, current_time, start_4h, start_24h, start_7d ) last_15m_volume = float(stats['vol_15m']) if stats and stats['vol_15m'] is not None else 0.0 def _to_float(x): return float(x) if x is not None else None avg4 = _to_float(stats['avg_4h']); std4 = _to_float(stats['std_4h']); n4 = int(stats['n_4h']) if stats and stats['n_4h'] is not None else 0 avg24 = _to_float(stats['avg_24h']); std24 = _to_float(stats['std_24h']); n24 = int(stats['n_24h']) if stats and stats['n_24h'] is not None else 0 avg7 = _to_float(stats['avg_7d']); std7 = _to_float(stats['std_7d']); n7 = int(stats['n_7d']) if stats and stats['n_7d'] is not None else 0 import math def scale_to_15m(avg_per_min, std_per_min): mean15 = (avg_per_min or 0.0) * 15.0 std15 = (std_per_min or 0.0) * math.sqrt(15.0) return mean15, std15 mean15_4, std15_4 = scale_to_15m(avg4, std4) mean15_24, std15_24 = scale_to_15m(avg24, std24) mean15_7, std15_7 = scale_to_15m(avg7, std7) # Weights gated by data availability w4 = 0.6 if n4 >= 60 else 0.0 # at least 1 hour of 1m bars w24 = 0.3 if n24 >= 240 else 0.0 # at least 4 hours of 1m bars w7 = 0.1 if n7 >= 2000 else 0.0 # ~>1.4 days of 1m bars total_w = w4 + w24 + w7 if total_w == 0.0: candidates = [] if n4 > 0: candidates.append((mean15_4, std15_4, 1.0)) if n24 > 0: candidates.append((mean15_24, std15_24, 1.0)) if n7 > 0: candidates.append((mean15_7, std15_7, 1.0)) total_w = sum(w for _, _, w in candidates) or 1.0 blended_mean15 = sum(m * w for m, _, w in candidates) / total_w blended_var15 = sum(((s or 0.0) ** 2) * w for _, s, w in candidates) / total_w else: blended_mean15 = (mean15_4 * w4 + mean15_24 * w24 + mean15_7 * w7) / total_w blended_var15 = (((std15_4 or 0.0) ** 2) * w4 + ((std15_24 or 0.0) ** 2) * w24 + ((std15_7 or 0.0) ** 2) * w7) / total_w blended_std15 = math.sqrt(blended_var15) if blended_var15 > 0 else 0.0 deviation_pct = 0.0 zscore = 0.0 if blended_mean15 > 0: deviation_pct = ((last_15m_volume - blended_mean15) / blended_mean15) * 100.0 if blended_std15 > 0: zscore = (last_15m_volume - blended_mean15) / blended_std15 volume_status = "Average" if blended_mean15 <= 0 and last_15m_volume > 0: volume_status = "Unusually High" else: if zscore >= 2.5 or deviation_pct >= 50.0: volume_status = "Unusually High" elif zscore >= 1.5 or deviation_pct >= 20.0: volume_status = "High" elif zscore <= -2.5 or deviation_pct <= -50.0: volume_status = "Unusually Low" elif zscore <= -1.5 or deviation_pct <= -20.0: volume_status = "Low" current_volume = float(current['volume']) def calc_change(old): if old and float(old) > 0: return round(((current_price - float(old)) / float(old)) * 100.0, 2) return 0.0 return { 'symbol': symbol_u, 'current_price': current_price, 'current_volume': current_volume, 'volume_status': volume_status, 'last_update': current_time.isoformat(), 'trends': { '15m': calc_change(price_15m['close_price'] if price_15m else None), '1h': calc_change(price_1h['close_price'] if price_1h else None), '1d': calc_change(price_1d['close_price'] if price_1d else None), '1w': calc_change(price_1w['close_price'] if price_1w else None), }, 'volume_context': { 'last_15m_volume': round(last_15m_volume, 8), 'expected_15m_volume': round(blended_mean15, 8), 'zscore': round(zscore, 3), 'deviation_pct': round(deviation_pct, 2), 'baselines': {'n_4h': n4, 'n_24h': n24, 'n_7d': n7} } } except Exception as e: self.logger.error(f"Error getting price trends with volume for {symbol_u}: {e}", exc_info=True) return None async def get_gap_fill_status(self, symbol: str, interval: str) -> Dict[str, Any]: """Get gap fill status for a symbol and interval""" gaps = await self.detect_gaps(symbol, interval) return {'symbol': _safe_upper(symbol), 'interval': interval, 'gaps': gaps['gaps'], 'coverage': gaps['coverage']} async def fill_genuine_gaps_with_averages(self, symbol: str, interval: str, max_consecutive: int = 5, lookback: int = 10) -> int: """Fill genuine empty gaps with intelligent averaging""" self.logger.info(f"Filling genuine gaps for {symbol} {interval}") gaps_info = await self.detect_gaps(symbol, interval) gaps = gaps_info.get("gaps", []) if not gaps: return 0 filled_count = 0 for gap in gaps: gap_start = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_start'])) gap_end = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_end'])) missing_candles = gap['missing_candles'] if missing_candles > max_consecutive: continue try: filled = await self._fill_gap_with_averaging(symbol, interval, gap_start, gap_end, missing_candles) filled_count += filled except Exception as e: self.logger.error(f"Error filling gap with averaging: {e}") continue return filled_count async def cleanup_old_data(self, retention_days: int = 365): """Clean up old data based on retention policy""" if not self.pool: return cutoff_date = _utc_now() - timedelta(days=retention_days) tick_cutoff = _utc_now() - timedelta(days=min(retention_days, 30)) async with self.pool.acquire() as conn: await conn.execute("DELETE FROM crypto_ticks WHERE time < $1;", tick_cutoff) await conn.execute("DELETE FROM crypto_ohlcv WHERE time < $1;", cutoff_date) await conn.execute("DELETE FROM technical_indicators WHERE time < $1;", cutoff_date) self.logger.info(f"Cleaned up data older than {retention_days} days") async def vacuum_analyze(self): """Perform database maintenance""" if not self.pool: return async with self.pool.acquire() as conn: tables = ['crypto_ticks', 'crypto_ohlcv', 'technical_indicators'] for t in tables: try: await conn.execute(f"VACUUM ANALYZE {t};") except Exception as e: self.logger.warning(f"VACUUM ANALYZE warning on {t}: {e}") self.logger.info("Database vacuum and analyze completed") @asynccontextmanager async def transaction(self): """Context manager for database transactions""" if not self.pool: raise RuntimeError("Pool not initialized") async with self.pool.acquire() as conn: async with conn.transaction(): yield conn async def close(self): """Close database connection pool""" if self.pool: await self.pool.close() self.logger.info("Database connection pool closed") # Utility function to create the database if it does not exist async def create_database_if_not_exists(): """Create database if it doesn't exist (connect via postgres db)""" conn = await asyncpg.connect( host=os.getenv('DB_HOST', 'localhost'), port=int(os.getenv('DB_PORT', 5432)), database='postgres', user=os.getenv('DB_USER', 'postgres'), password=os.getenv('DB_PASSWORD', 'password'), ) db_name = os.getenv('DB_NAME', 'crypto_trading') try: exists = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname = $1", db_name) if not exists: await conn.execute(f'CREATE DATABASE "{db_name}";') print(f"Database '{db_name}' created successfully") else: print(f"Database '{db_name}' already exists") except Exception as e: print(f"Error creating database: {e}") finally: await conn.close() if __name__ == "__main__": async def test_db(): await create_database_if_not_exists() db = DatabaseManager() await db.initialize() stats = await db.get_total_records() print(f"Total records: {stats}") await db.close() asyncio.run(test_db())