Files
lewismac 2708bcb176 feat: Add 100% technical indicator coverage tracking and backfilling
- Add check_indicator_coverage() to detect OHLCV records missing indicators
- Add get_ohlcv_missing_indicators() to identify specific gaps
- Add backfill_missing_indicators() to fill missing indicator data
- Add get_ohlcv_data_range() helper for fetching historical data
- Add get_all_indicator_coverage_status() for system-wide monitoring
- Define REQUIRED_INDICATORS constant with all 16 required indicators
- Process backfills in configurable batches to manage memory
- Calculate indicators using existing utils.calculate_technical_indicators()
- Track coverage statistics before/after backfill operations
- Support for automated indicator completeness verification

This ensures every crypto_ohlcv record has all 16 technical indicators
(adx_14, atr_14, bb_lower, bb_middle, bb_upper, ema_12, ema_26,
macd_histogram, macd_line, macd_signal, rsi_14, sma_20, sma_200,
sma_50, stoch_d, stoch_k) calculated and stored
2025-10-09 08:46:25 +01:00

2014 lines
87 KiB
Python

#!/usr/bin/env python3
"""
db.py - TimescaleDB Database Operations and Schema Management
Database operations, connection pooling, and schema management for crypto trading data
"""
import asyncio
import logging
import os
from datetime import datetime, timedelta, timezone, date, time as dt_time
from typing import Dict, List, Optional, Any, Tuple, Iterable
import asyncpg
from contextlib import asynccontextmanager
from dotenv import load_dotenv
# Load environment variables
load_dotenv('variables.env')
def _utc_now() -> datetime:
return datetime.now(timezone.utc)
def _ensure_dt_aware_utc(dt: datetime) -> datetime:
if isinstance(dt, dt_time):
dt = datetime.combine(_utc_now().date(), dt)
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _safe_upper(s: Optional[str]) -> Optional[str]:
return s.upper() if isinstance(s, str) else s
class DatabaseManager:
"""Manages TimescaleDB operations with connection pooling"""
def __init__(self):
self.pool: Optional[asyncpg.Pool] = None
self.logger = logging.getLogger(__name__)
self._connection_semaphore: Optional[asyncio.Semaphore] = None
# Database connection parameters
self.db_config = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': int(os.getenv('DB_PORT', 5432)),
'database': os.getenv('DB_NAME', 'crypto_trading'),
'user': os.getenv('DB_USER', 'postgres'),
'password': os.getenv('DB_PASSWORD', 'password'),
'min_size': int(os.getenv('DB_POOL_MIN_SIZE', 10)),
'max_size': int(os.getenv('DB_POOL_MAX_SIZE', 50)),
'command_timeout': int(os.getenv('DB_COMMAND_TIMEOUT', 60)),
}
async def initialize(self):
"""Initialize database connection pool and create tables"""
try:
self.logger.info("Initializing database connection pool")
self.pool = await asyncpg.create_pool(
host=self.db_config['host'],
port=self.db_config['port'],
database=self.db_config['database'],
user=self.db_config['user'],
password=self.db_config['password'],
min_size=self.db_config['min_size'],
max_size=self.db_config['max_size'],
command_timeout=self.db_config['command_timeout'],
)
# Initialize semaphore to prevent connection exhaustion (20% headroom)
max_concurrent = max(1, int(self.db_config['max_size'] * 0.8))
self._connection_semaphore = asyncio.Semaphore(max_concurrent)
self.logger.info(f"Database connection pool created successfully (max concurrent: {max_concurrent})")
# Create tables and hypertables
await self.create_schema()
self.logger.info("Database initialization complete")
except Exception as e:
self.logger.error(f"Database initialization failed: {e}", exc_info=True)
raise
@asynccontextmanager
async def acquire_with_semaphore(self):
"""Acquire connection with semaphore to prevent pool exhaustion"""
if self._connection_semaphore is None or self.pool is None:
raise RuntimeError("DatabaseManager not initialized")
async with self._connection_semaphore:
async with self.pool.acquire() as conn:
yield conn
async def create_schema(self):
"""Create database schema with TimescaleDB hypertables"""
if not self.pool:
raise RuntimeError("Pool not initialized")
async with self.pool.acquire() as conn:
try:
# Try to enable TimescaleDB extension
try:
await conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;")
timescale_ok = True
except Exception as e:
self.logger.warning(f"TimescaleDB extension not available or failed to enable: {e}")
timescale_ok = False
# Create tables
await conn.execute("""
CREATE TABLE IF NOT EXISTS crypto_ticks (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
exchange TEXT NOT NULL DEFAULT 'binance',
price DECIMAL(20,8) NOT NULL,
quantity DECIMAL(20,8) NOT NULL,
trade_id BIGINT,
is_buyer_maker BOOLEAN,
PRIMARY KEY (time, symbol, trade_id)
);
""")
await conn.execute("""
CREATE TABLE IF NOT EXISTS crypto_ohlcv (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
exchange TEXT NOT NULL DEFAULT 'binance',
interval TEXT NOT NULL,
open_price DECIMAL(20,8) NOT NULL,
high_price DECIMAL(20,8) NOT NULL,
low_price DECIMAL(20,8) NOT NULL,
close_price DECIMAL(20,8) NOT NULL,
volume DECIMAL(20,8) NOT NULL,
quote_volume DECIMAL(20,8),
trade_count INTEGER,
PRIMARY KEY (time, symbol, interval)
);
""")
await conn.execute("""
CREATE TABLE IF NOT EXISTS technical_indicators (
time TIMESTAMPTZ NOT NULL,
symbol TEXT NOT NULL,
exchange TEXT NOT NULL DEFAULT 'binance',
interval TEXT NOT NULL,
indicator_name TEXT NOT NULL,
indicator_value DECIMAL(20,8),
metadata JSONB,
PRIMARY KEY (time, symbol, interval, indicator_name)
);
""")
# Create hypertables
if timescale_ok:
try:
await conn.execute("SELECT create_hypertable('crypto_ticks', 'time', if_not_exists => TRUE);")
except asyncpg.PostgresError as e:
self.logger.debug(f"crypto_ticks hypertable setup note: {e}")
try:
await conn.execute("SELECT create_hypertable('crypto_ohlcv', 'time', if_not_exists => TRUE);")
except asyncpg.PostgresError as e:
self.logger.debug(f"crypto_ohlcv hypertable setup note: {e}")
try:
await conn.execute("SELECT create_hypertable('technical_indicators', 'time', if_not_exists => TRUE);")
except asyncpg.PostgresError as e:
self.logger.debug(f"technical_indicators hypertable setup note: {e}")
# Create indexes for better query performance
await self.create_indexes(conn)
# Setup compression policies when possible
if timescale_ok:
await self.setup_compression_policies(conn)
else:
self.logger.info("Skipping compression policies because TimescaleDB extension is unavailable")
self.logger.info("Database schema created successfully")
except Exception as e:
self.logger.error(f"Error creating database schema: {e}", exc_info=True)
raise
async def create_indexes(self, conn: asyncpg.Connection):
"""Create indexes for better query performance"""
index_sqls = [
# Ticks indexes
"CREATE INDEX IF NOT EXISTS idx_crypto_ticks_symbol_time ON crypto_ticks (symbol, time DESC);",
"CREATE INDEX IF NOT EXISTS idx_crypto_ticks_time_symbol ON crypto_ticks (time DESC, symbol);",
# OHLCV indexes
"CREATE INDEX IF NOT EXISTS idx_crypto_ohlcv_symbol_interval_time ON crypto_ohlcv (symbol, interval, time DESC);",
"CREATE INDEX IF NOT EXISTS idx_crypto_ohlcv_time_symbol ON crypto_ohlcv (time DESC, symbol);",
# Indicators indexes
"CREATE INDEX IF NOT EXISTS idx_technical_indicators_symbol_indicator_time ON technical_indicators (symbol, indicator_name, time DESC);",
"CREATE INDEX IF NOT EXISTS idx_technical_indicators_time_symbol ON technical_indicators (time DESC, symbol);",
]
for sql in index_sqls:
try:
await conn.execute(sql)
except Exception as e:
self.logger.warning(f"Index creation warning: {e}")
async def setup_compression_policies(self, conn: asyncpg.Connection):
"""Setup TimescaleDB compression policies"""
try:
compression_alters = [
"""
ALTER TABLE crypto_ticks SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'symbol,exchange',
timescaledb.compress_orderby = 'time DESC'
);
""",
"""
ALTER TABLE crypto_ohlcv SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'symbol,exchange,interval',
timescaledb.compress_orderby = 'time DESC'
);
""",
"""
ALTER TABLE technical_indicators SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'symbol,exchange,interval,indicator_name',
timescaledb.compress_orderby = 'time DESC'
);
""",
]
for q in compression_alters:
try:
await conn.execute(q)
except Exception as e:
self.logger.warning(f"Compression setup warning: {e}")
policies = [
"SELECT add_compression_policy('crypto_ticks', INTERVAL '7 days');",
"SELECT add_compression_policy('crypto_ohlcv', INTERVAL '7 days');",
"SELECT add_compression_policy('technical_indicators', INTERVAL '7 days');",
]
for q in policies:
try:
await conn.execute(q)
except Exception as e:
self.logger.warning(f"Compression policy warning: {e}")
except Exception as e:
self.logger.warning(f"Compression setup failed: {e}")
# --------------------------
# Insertion methods
# --------------------------
async def insert_tick_single(self, tick_data: Dict[str, Any]):
"""Insert single tick record"""
if not self.pool:
raise RuntimeError("Pool not initialized")
tick_time = _ensure_dt_aware_utc(tick_data['time'])
symbol = _safe_upper(tick_data['symbol'])
exchange = tick_data.get('exchange', 'binance')
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO crypto_ticks
(time, symbol, exchange, price, quantity, trade_id, is_buyer_maker)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (time, symbol, trade_id) DO NOTHING;
""",
tick_time, symbol, exchange,
tick_data['price'], tick_data['quantity'], tick_data['trade_id'],
tick_data.get('is_buyer_maker', None),
)
async def insert_ticks_batch(self, ticks_data: List[Dict[str, Any]]):
"""Insert multiple tick records in batch"""
if not ticks_data or not self.pool:
return
records: List[Tuple[Any, ...]] = []
for t in ticks_data:
tick_time = _ensure_dt_aware_utc(t['time'])
symbol = _safe_upper(t['symbol'])
exchange = t.get('exchange', 'binance')
records.append((
tick_time, symbol, exchange, t['price'], t['quantity'],
t['trade_id'], t.get('is_buyer_maker', None)
))
async with self.pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO crypto_ticks
(time, symbol, exchange, price, quantity, trade_id, is_buyer_maker)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (time, symbol, trade_id) DO NOTHING;
""",
records
)
self.logger.debug(f"Inserted {len(records)} tick records")
async def insert_ohlcv_single(self, ohlcv_data: Dict[str, Any]):
"""Insert single OHLCV record"""
if not self.pool:
raise RuntimeError("Pool not initialized")
ts = _ensure_dt_aware_utc(ohlcv_data['time'])
symbol = _safe_upper(ohlcv_data['symbol'])
exchange = ohlcv_data.get('exchange', 'binance')
interval = ohlcv_data['interval']
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO crypto_ohlcv
(time, symbol, exchange, interval, open_price, high_price,
low_price, close_price, volume, quote_volume, trade_count)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (time, symbol, interval) DO UPDATE SET
open_price = EXCLUDED.open_price,
high_price = EXCLUDED.high_price,
low_price = EXCLUDED.low_price,
close_price = EXCLUDED.close_price,
volume = EXCLUDED.volume,
quote_volume = EXCLUDED.quote_volume,
trade_count = EXCLUDED.trade_count;
""",
ts, symbol, exchange, interval,
ohlcv_data['open_price'], ohlcv_data['high_price'],
ohlcv_data['low_price'], ohlcv_data['close_price'],
ohlcv_data['volume'], ohlcv_data.get('quote_volume'),
ohlcv_data.get('trade_count'),
)
async def insert_ohlcv_batch(self, ohlcv_data: List[Dict[str, Any]]):
"""Insert multiple OHLCV records in batch"""
if not ohlcv_data or not self.pool:
return
records: List[Tuple[Any, ...]] = []
for c in ohlcv_data:
ts = _ensure_dt_aware_utc(c['time'])
symbol = _safe_upper(c['symbol'])
exchange = c.get('exchange', 'binance')
interval = c['interval']
records.append((
ts, symbol, exchange, interval,
c['open_price'], c['high_price'], c['low_price'], c['close_price'],
c['volume'], c.get('quote_volume'), c.get('trade_count')
))
async with self.pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO crypto_ohlcv
(time, symbol, exchange, interval, open_price, high_price,
low_price, close_price, volume, quote_volume, trade_count)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
ON CONFLICT (time, symbol, interval) DO UPDATE SET
open_price = EXCLUDED.open_price,
high_price = EXCLUDED.high_price,
low_price = EXCLUDED.low_price,
close_price = EXCLUDED.close_price,
volume = EXCLUDED.volume,
quote_volume = EXCLUDED.quote_volume,
trade_count = EXCLUDED.trade_count;
""",
records
)
self.logger.debug(f"Inserted {len(records)} OHLCV records")
async def insert_indicators_batch(self, symbol: str, interval: str, indicators_data: List[Dict[str, Any]]):
"""Insert technical indicators batch"""
if not indicators_data or not self.pool:
return
symbol_u = _safe_upper(symbol)
records: List[Tuple[Any, ...]] = []
for ind in indicators_data:
ts = _ensure_dt_aware_utc(ind['time'])
records.append((
ts, symbol_u, 'binance', interval,
ind['indicator_name'], ind.get('indicator_value'),
ind.get('metadata'),
))
async with self.pool.acquire() as conn:
await conn.executemany(
"""
INSERT INTO technical_indicators
(time, symbol, exchange, interval, indicator_name, indicator_value, metadata)
VALUES ($1,$2,$3,$4,$5,$6,$7)
ON CONFLICT (time, symbol, interval, indicator_name) DO UPDATE SET
indicator_value = EXCLUDED.indicator_value,
metadata = EXCLUDED.metadata;
""",
records
)
self.logger.debug(f"Inserted {len(records)} indicator records for {symbol_u} {interval}")
# --------------------------
# Retrieval methods
# --------------------------
async def get_tick_data(self, symbol: str, start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
"""Get tick data for a symbol within time range"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
start_t = _ensure_dt_aware_utc(start_time)
end_t = _ensure_dt_aware_utc(end_time)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT time, symbol, exchange, price, quantity, trade_id, is_buyer_maker
FROM crypto_ticks
WHERE symbol = $1 AND time >= $2 AND time <= $3
ORDER BY time ASC;
""",
symbol_u, start_t, end_t
)
return [dict(row) for row in rows]
async def get_ohlcv_data(self, symbol: str, interval: str, limit: int = 1000) -> List[Dict[str, Any]]:
"""Get OHLCV data for a symbol and interval (returns newest first)"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT time, symbol, exchange, interval, open_price, high_price, low_price,
close_price, volume, quote_volume, trade_count
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC
LIMIT $3;
""",
symbol_u, interval, limit
)
return [dict(row) for row in rows]
async def get_recent_candles(self, symbol: str, interval: str, limit: int = 1000) -> List[Dict[str, Any]]:
"""
Get recent candles for a symbol and interval (alias for chart display)
Returns data in ASCENDING order with JS-friendly field names
"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT time, symbol, exchange, interval, open_price, high_price, low_price,
close_price, volume, quote_volume, trade_count
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
ORDER BY time ASC
LIMIT $3;
""",
symbol_u, interval, limit
)
result: List[Dict[str, Any]] = []
for row in rows:
t: Optional[datetime] = row['time']
result.append({
'timestamp': t.isoformat() if t else None,
'symbol': row['symbol'],
'exchange': row['exchange'],
'interval': row['interval'],
'open': float(row['open_price']) if row['open_price'] is not None else None,
'high': float(row['high_price']) if row['high_price'] is not None else None,
'low': float(row['low_price']) if row['low_price'] is not None else None,
'close': float(row['close_price']) if row['close_price'] is not None else None,
'volume': float(row['volume']) if row['volume'] is not None else None,
'quote_volume': float(row['quote_volume']) if row['quote_volume'] is not None else None,
'trade_count': int(row['trade_count']) if row['trade_count'] is not None else None,
})
self.logger.info(f"Retrieved {len(result)} candles for {symbol_u} {interval}")
return result
async def get_latest_ohlcv(self, symbol: str, interval: str = '1d') -> Optional[Dict[str, Any]]:
"""Get latest OHLCV record for a symbol"""
if not self.pool:
return None
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT time, symbol, exchange, interval, open_price, high_price, low_price,
close_price, volume, quote_volume, trade_count
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
ORDER BY time DESC
LIMIT 1;
""",
symbol_u, interval
)
return dict(row) if row else None
async def get_indicators_data(self, symbol: str, interval: str, indicator_name: str,
limit: int = 1000) -> List[Dict[str, Any]]:
"""Get technical indicator data"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT time, symbol, exchange, interval, indicator_name, indicator_value, metadata
FROM technical_indicators
WHERE symbol = $1 AND interval = $2 AND indicator_name = $3
ORDER BY time DESC
LIMIT $4;
""",
symbol_u, interval, indicator_name, limit
)
return [dict(row) for row in rows]
async def get_available_symbols(self) -> List[str]:
"""Get list of all available symbols in the database"""
if not self.pool:
return []
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT DISTINCT symbol FROM crypto_ohlcv ORDER BY symbol ASC;
"""
)
return [row['symbol'] for row in rows]
async def get_current_price_and_trends(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
Get current price and percentage changes for multiple timeframes
Returns price trends for 15m, 1h, 1d, 1w
"""
if not self.pool:
return None
symbol_u = _safe_upper(symbol)
now = _utc_now()
time_15m_ago = now - timedelta(minutes=15)
time_1h_ago = now - timedelta(hours=1)
time_1d_ago = now - timedelta(days=1)
time_1w_ago = now - timedelta(weeks=1)
async with self.pool.acquire() as conn:
try:
current = await conn.fetchrow(
"""
SELECT close_price, time
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1h'
ORDER BY time DESC
LIMIT 1;
""",
symbol_u
)
if not current:
return None
current_price = float(current['close_price'])
current_time = current['time']
price_15m = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1m' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_15m_ago
)
price_1h = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1h' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_1h_ago
)
price_1d = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1h' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_1d_ago
)
price_1w = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1d' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_1w_ago
)
def calc_change(old):
if old and float(old) > 0:
return round(((current_price - float(old)) / float(old)) * 100, 2)
return 0.0
return {
'symbol': symbol_u,
'current_price': current_price,
'last_update': current_time.isoformat(),
'trends': {
'15m': calc_change(price_15m['close_price'] if price_15m else None),
'1h': calc_change(price_1h['close_price'] if price_1h else None),
'1d': calc_change(price_1d['close_price'] if price_1d else None),
'1w': calc_change(price_1w['close_price'] if price_1w else None),
},
}
except Exception as e:
self.logger.error(f"Error getting price trends for {symbol_u}: {e}", exc_info=True)
return None
# --------------------------
# Gap and coverage utilities
# --------------------------
def _interval_to_seconds(self, interval: str) -> int:
"""Convert interval string to seconds"""
try:
if interval.endswith('m'):
return int(interval[:-1]) * 60
if interval.endswith('h'):
return int(interval[:-1]) * 3600
if interval.endswith('d'):
return int(interval[:-1]) * 86400
if interval.endswith('w'):
return int(interval[:-1]) * 604800
except Exception:
pass
# Default to 1 minute
return 60
def _calculate_expected_records(self, start: datetime, end: datetime, interval: str) -> int:
"""Calculate expected number of records for a time period"""
start_u = _ensure_dt_aware_utc(start)
end_u = _ensure_dt_aware_utc(end)
if end_u < start_u:
return 0
total_seconds = (end_u - start_u).total_seconds()
interval_seconds = max(1, self._interval_to_seconds(interval))
return int(total_seconds // interval_seconds) + 1
async def get_data_coverage_summary(self, symbol: str, interval: str) -> Dict[str, Any]:
"""Get data coverage summary for a symbol and interval"""
if not self.pool:
return {
'symbol': symbol, 'interval': interval,
'first_record': None, 'last_record': None,
'total_records': 0, 'expected_records': 0,
'missing_records': 0, 'coverage_percent': 0.0,
}
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
result = await conn.fetchrow(
"""
SELECT MIN(time) as first_record,
MAX(time) as last_record,
COUNT(*) as total_records
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2;
""",
symbol_u, interval
)
if not result or not result['first_record']:
return {
'symbol': symbol_u, 'interval': interval,
'first_record': None, 'last_record': None,
'total_records': 0, 'expected_records': 0,
'missing_records': 0, 'coverage_percent': 0.0,
}
first_record: datetime = result['first_record']
last_record: datetime = result['last_record']
total_records: int = result['total_records']
expected_records = self._calculate_expected_records(first_record, last_record, interval)
missing_records = max(0, expected_records - total_records)
coverage_percent = (total_records / expected_records * 100) if expected_records > 0 else 0.0
return {
'symbol': symbol_u,
'interval': interval,
'first_record': first_record.isoformat(),
'last_record': last_record.isoformat(),
'total_records': total_records,
'expected_records': expected_records,
'missing_records': missing_records,
'coverage_percent': round(coverage_percent, 2),
}
async def get_existing_time_ranges(self, symbol: str, interval: str,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""
Get existing time ranges for a symbol and interval within a date range.
Returns list of existing data blocks to help identify gaps.
"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
start_u = _ensure_dt_aware_utc(start_date)
end_u = _ensure_dt_aware_utc(end_date)
interval_seconds = self._interval_to_seconds(interval)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
WITH ordered AS (
SELECT time,
LAG(time) OVER (ORDER BY time) AS prev_time
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
AND time >= $3 AND time <= $4
ORDER BY time
),
markers AS (
SELECT time,
CASE
WHEN prev_time IS NULL OR EXTRACT(EPOCH FROM (time - prev_time)) > $5
THEN 1 ELSE 0
END AS is_new_block
FROM ordered
),
blocks AS (
SELECT time,
SUM(is_new_block) OVER (ORDER BY time) AS block_id
FROM markers
)
SELECT MIN(time) AS block_start, MAX(time) AS block_end
FROM blocks
GROUP BY block_id
ORDER BY block_start;
""",
symbol_u, interval, start_u, end_u, interval_seconds + 1
)
ranges: List[Dict[str, Any]] = []
for row in rows:
ranges.append({'start': row['block_start'], 'end': row['block_end']})
return ranges
async def get_missing_time_ranges(self, symbol: str, interval: str,
start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Calculate missing time ranges that need to be downloaded."""
start_u = _ensure_dt_aware_utc(start_date)
end_u = _ensure_dt_aware_utc(end_date)
if end_u <= start_u:
return []
existing_blocks = await self.get_existing_time_ranges(symbol, interval, start_u, end_u)
if not existing_blocks:
return [{'start': start_u, 'end': end_u}]
missing: List[Dict[str, Any]] = []
current = start_u
for block in existing_blocks:
block_start: datetime = _ensure_dt_aware_utc(block['start'])
block_end: datetime = _ensure_dt_aware_utc(block['end'])
if current < block_start:
missing.append({'start': current, 'end': block_start})
current = max(current, block_end + timedelta(microseconds=1))
if current < end_u:
missing.append({'start': current, 'end': end_u})
return missing
async def check_data_exists_for_range(self, symbol: str, interval: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""
Quick check if data exists for a specific range.
Returns:
Dict with 'exists', 'count', 'expected_count', 'coverage_percent', 'is_complete'
"""
if not self.pool:
return {'exists': False, 'count': 0, 'expected_count': 0, 'coverage_percent': 0.0, 'is_complete': False}
symbol_u = _safe_upper(symbol)
start_u = _ensure_dt_aware_utc(start_date)
end_u = _ensure_dt_aware_utc(end_date)
async with self.pool.acquire() as conn:
result = await conn.fetchrow(
"""
SELECT COUNT(*) as count
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
AND time >= $3 AND time <= $4;
""",
symbol_u, interval, start_u, end_u
)
count = int(result['count']) if result else 0
expected_count = self._calculate_expected_records(start_u, end_u, interval)
coverage_percent = (count / expected_count * 100) if expected_count > 0 else 0.0
return {
'exists': count > 0,
'count': count,
'expected_count': expected_count,
'coverage_percent': round(coverage_percent, 2),
'is_complete': coverage_percent >= 99.0,
}
async def find_data_gaps(self, symbol: str, interval: str, min_gap_size: int = 2) -> List[Dict[str, Any]]:
"""Find gaps in data (missing consecutive candles)"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
interval_seconds = self._interval_to_seconds(interval)
threshold = interval_seconds * max(1, min_gap_size)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
WITH s AS (
SELECT time, LAG(time) OVER (ORDER BY time) AS prev_time
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
ORDER BY time
)
SELECT
prev_time AS gap_start,
time AS gap_end,
EXTRACT(EPOCH FROM (time - prev_time)) / $3 AS missing_candles
FROM s
WHERE prev_time IS NOT NULL
AND EXTRACT(EPOCH FROM (time - prev_time)) > $4
ORDER BY prev_time;
""",
symbol_u, interval, interval_seconds, threshold
)
gaps: List[Dict[str, Any]] = []
for row in rows:
start_dt: datetime = row['gap_start']
end_dt: datetime = row['gap_end']
missing = max(0, int(row['missing_candles']) - 1)
gaps.append({
'gap_start': start_dt.isoformat(),
'gap_end': end_dt.isoformat(),
'missing_candles': missing,
'duration_hours': round((end_dt - start_dt).total_seconds() / 3600, 2),
})
return gaps
async def detect_gaps(self, symbol: str, interval: str) -> Dict[str, Any]:
"""Detect data gaps for a symbol and interval"""
coverage = await self.get_data_coverage_summary(symbol, interval)
gaps = await self.find_data_gaps(symbol, interval, min_gap_size=2)
return {'coverage': coverage, 'gaps': gaps}
async def get_data_coverage_by_day(self, symbol: str, interval: str,
start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Get daily coverage statistics for a symbol/interval"""
if not self.pool:
return {'daily_coverage': []}
symbol_u = _safe_upper(symbol)
start_u = _ensure_dt_aware_utc(start_date)
end_u = _ensure_dt_aware_utc(end_date)
try:
async with self.pool.acquire() as conn:
interval_seconds = self._interval_to_seconds(interval)
records_per_day = max(1, 86400 // max(1, interval_seconds))
rows = await conn.fetch(
"""
SELECT DATE(time) as date,
COUNT(*) as actual_records,
$3 as expected_records,
ROUND((COUNT(*)::decimal / $3) * 100, 2) as coverage_percent
FROM crypto_ohlcv
WHERE symbol = $1
AND interval = $2
AND time >= $4
AND time <= $5
GROUP BY DATE(time)
ORDER BY DATE(time) ASC;
""",
symbol_u, interval, records_per_day, start_u, end_u
)
daily: List[Dict[str, Any]] = []
for row in rows:
coverage_pct = float(row['coverage_percent'])
status = 'complete' if coverage_pct >= 95 else ('partial' if coverage_pct >= 50 else 'empty')
# row['date'] is a date; serialize to ISO date
d: date = row['date']
daily.append({
'date': d.isoformat(),
'actual_records': int(row['actual_records']),
'expected_records': int(row['expected_records']),
'coverage_percent': coverage_pct,
'status': status,
})
return {
'symbol': symbol_u,
'interval': interval,
'start_date': start_u.isoformat(),
'end_date': end_u.isoformat(),
'daily_coverage': daily,
}
except Exception as e:
self.logger.error(f"Error getting daily coverage: {e}", exc_info=True)
return {'daily_coverage': []}
async def get_all_pairs_gap_status(self) -> List[Dict[str, Any]]:
"""
Get gap status for all trading pairs across all intervals
Returns comprehensive status for the monitoring UI
"""
from utils import load_config # local import to avoid circular imports
config = load_config()
intervals = config.get('collection', {}).get('candle_intervals', ['1m', '5m', '15m', '1h', '4h', '1d'])
results: List[Dict[str, Any]] = []
for pair in config.get('trading_pairs', []):
symbol = pair['symbol']
record_from_date = pair.get('record_from_date') or \
config.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z')
for interval in intervals:
try:
gap_info = await self.detect_gaps(symbol, interval)
coverage = gap_info.get('coverage', {})
gaps = gap_info.get('gaps', [])
if len(gaps) == 0:
status = 'complete'
elif any(g['missing_candles'] > 100 for g in gaps):
status = 'filling'
elif coverage.get('total_records', 0) == 0:
status = 'empty'
else:
status = 'has_gaps'
fillable_gaps = [g for g in gaps if g['missing_candles'] <= 100]
results.append({
'symbol': symbol,
'interval': interval,
'status': status,
'total_gaps': len(gaps),
'fillable_gaps': len(fillable_gaps),
'coverage_percent': coverage.get('coverage_percent', 0),
'start_date': record_from_date,
'first_record': coverage.get('first_record'),
'last_record': coverage.get('last_record'),
'total_records': coverage.get('total_records', 0),
'expected_records': coverage.get('expected_records', 0),
'missing_records': coverage.get('missing_records', 0),
'gaps': gaps,
})
except Exception as e:
self.logger.error(f"Error getting status for {symbol} {interval}: {e}")
continue
return results
async def fill_gaps_intelligently(self, symbol: str, interval: str, max_attempts: int = 3) -> Dict[str, Any]:
"""
Intelligently fill gaps with retry logic and averaging for unfillable gaps
"""
from binance.client import Client # local import
self.logger.info(f"Starting intelligent gap fill for {symbol} {interval}")
gap_info = await self.detect_gaps(symbol, interval)
gaps = gap_info.get('gaps', [])
if not gaps:
return {'status': 'success', 'message': 'No gaps found', 'gaps_filled': 0, 'averaged_candles': 0}
api_key = os.getenv('BINANCE_API_KEY')
secret_key = os.getenv('BINANCE_SECRET_KEY')
client = Client(api_key, secret_key) if api_key and secret_key else Client()
gaps_filled = 0
averaged_candles = 0
failed_gaps: List[Dict[str, Any]] = []
for gap in gaps:
gap_start = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_start']))
gap_end = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_end']))
missing_candles = gap['missing_candles']
if missing_candles > 1000:
self.logger.info(f"Skipping large gap: {missing_candles} candles")
continue
filled = False
for attempt in range(max_attempts):
try:
self.logger.info(f"Attempt {attempt + 1}/{max_attempts} to fill gap: {gap_start} to {gap_end}")
klines = client.get_historical_klines(
symbol=symbol,
interval=interval,
start_str=int(gap_start.timestamp() * 1000),
end_str=int(gap_end.timestamp() * 1000),
limit=1000
)
if klines and len(klines) > 0:
# Normalize to parse_kline_data shape
from utils import parse_kline_data
ohlcv_batch: List[Dict[str, Any]] = []
for k in klines:
try:
ws_like = {
'e': 'kline',
'E': int(k[6]),
's': _safe_upper(symbol),
'k': {
't': int(k[0]),
'T': int(k[6]),
's': _safe_upper(symbol),
'i': interval,
'o': str(k[1]),
'h': str(k[2]),
'l': str(k[3]),
'c': str(k[4]),
'v': str(k[5]),
'q': str(k[7]),
'n': int(k[8]),
'x': True,
},
}
parsed = parse_kline_data(ws_like)
ohlcv_batch.append(parsed)
except Exception as pe:
self.logger.error(f"Error parsing kline: {pe}")
continue
if ohlcv_batch:
await self.insert_ohlcv_batch(ohlcv_batch)
gaps_filled += 1
filled = True
self.logger.info(f"Successfully filled gap with {len(ohlcv_batch)} records")
break
else:
self.logger.warning(f"No parsed data produced for gap {gap_start} to {gap_end}")
else:
self.logger.warning(f"No data returned from Binance for gap {gap_start} to {gap_end}")
except Exception as e:
self.logger.error(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_attempts - 1:
await asyncio.sleep(2 ** attempt)
if not filled and missing_candles <= 5:
try:
self.logger.info(f"Using intelligent averaging for small gap: {missing_candles} candles")
averaged = await self._fill_gap_with_averaging(symbol, interval, gap_start, gap_end, missing_candles)
averaged_candles += averaged
if averaged > 0:
gaps_filled += 1
except Exception as e:
self.logger.error(f"Error averaging gap: {e}")
failed_gaps.append(gap)
elif not filled:
failed_gaps.append(gap)
return {
'status': 'success',
'message': f'Filled {gaps_filled} gaps ({averaged_candles} via averaging)',
'gaps_filled': gaps_filled,
'averaged_candles': averaged_candles,
'failed_gaps': len(failed_gaps),
'total_gaps': len(gaps),
}
async def get_prioritized_gaps(self, symbol: str, interval: str) -> List[Dict[str, Any]]:
"""
Get gaps prioritized by importance (recent gaps first, then by size)
This helps fill the most critical gaps first
"""
gaps = (await self.detect_gaps(symbol, interval)).get('gaps', [])
if not gaps:
return []
now = _utc_now()
for g in gaps:
gap_end = datetime.fromisoformat(g['gap_end'])
gap_end = _ensure_dt_aware_utc(gap_end)
days_old = (now - gap_end).days
recency_score = max(0, 365 - days_old) / 365 * 100.0
size_score = min(100.0, 100.0 / max(1, g['missing_candles']))
fillable_bonus = 50.0 if g['missing_candles'] <= 100 else 0.0
g['priority_score'] = recency_score + size_score + fillable_bonus
g['days_old'] = days_old
gaps.sort(key=lambda x: x['priority_score'], reverse=True)
return gaps
async def _fill_gap_with_averaging(self, symbol: str, interval: str,
gap_start: datetime, gap_end: datetime, missing_candles: int) -> int:
"""Fill a gap using intelligent averaging based on surrounding candles"""
if not self.pool:
return 0
symbol_u = _safe_upper(symbol)
start_u = _ensure_dt_aware_utc(gap_start)
end_u = _ensure_dt_aware_utc(gap_end)
async with self.pool.acquire() as conn:
try:
lookback = 10
before = await conn.fetch(
"""
SELECT open_price, high_price, low_price, close_price, volume
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2 AND time < $3
ORDER BY time DESC LIMIT $4;
""",
symbol_u, interval, start_u, lookback
)
after = await conn.fetch(
"""
SELECT open_price, high_price, low_price, close_price, volume
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2 AND time > $3
ORDER BY time ASC LIMIT $4;
""",
symbol_u, interval, end_u, lookback
)
if not before or not after:
self.logger.warning("Not enough surrounding data for averaging")
return 0
avg_open = sum(float(c['open_price']) for c in before) / len(before)
avg_high = sum(float(c['high_price']) for c in before) / len(before)
avg_low = sum(float(c['low_price']) for c in before) / len(before)
avg_close = sum(float(c['close_price']) for c in before) / len(before)
avg_volume = sum(float(c['volume']) for c in before) / len(before)
interval_seconds = self._interval_to_seconds(interval)
current_time = start_u
gen: List[Dict[str, Any]] = []
import random
while current_time < end_u:
variation = random.uniform(0.999, 1.001)
gen.append({
"time": current_time,
"symbol": symbol_u,
"exchange": "binance",
"interval": interval,
"open_price": avg_open * variation,
"high_price": avg_high * variation * 1.001,
"low_price": avg_low * variation * 0.999,
"close_price": avg_close * variation,
"volume": avg_volume * variation,
"quote_volume": None,
"trade_count": None,
})
current_time = current_time + timedelta(seconds=interval_seconds)
if current_time >= end_u:
break
if gen:
await self.insert_ohlcv_batch(gen)
self.logger.info(f"Inserted {len(gen)} averaged candles")
return len(gen)
return 0
except Exception as e:
self.logger.error(f"Error in averaging: {e}", exc_info=True)
return 0
async def check_data_health(self, symbol: str, interval: str) -> Dict[str, Any]:
"""
Comprehensive health check for a trading pair's data
Detects various data quality issues beyond just gaps
"""
coverage = await self.get_data_coverage_summary(symbol, interval)
gaps = await self.find_data_gaps(symbol, interval)
health_issues: List[Dict[str, Any]] = []
# 1. Stale data
last_record_iso = coverage.get('last_record')
if last_record_iso:
last_record = _ensure_dt_aware_utc(datetime.fromisoformat(last_record_iso))
hours_since = (_utc_now() - last_record).total_seconds() / 3600
if hours_since > 24:
health_issues.append({'severity': 'high', 'issue': 'stale_data',
'message': f'No data in last {hours_since:.1f} hours'})
# 2. Excessive gaps
if len(gaps) > 10:
health_issues.append({'severity': 'medium', 'issue': 'fragmented_data',
'message': f'{len(gaps)} gaps detected - data is fragmented'})
# 3. Low coverage
coverage_pct = coverage.get('coverage_percent', 0.0) or 0.0
if coverage_pct < 50:
health_issues.append({'severity': 'high', 'issue': 'low_coverage',
'message': f'Only {coverage_pct:.1f}% data coverage'})
elif coverage_pct < 80:
health_issues.append({'severity': 'medium', 'issue': 'medium_coverage',
'message': f'{coverage_pct:.1f}% data coverage - could be improved'})
# 4. Zero-volume candles
zero_vol_count = 0
if self.pool:
async with self.pool.acquire() as conn:
zero_vol_count = await conn.fetchval(
"""
SELECT COUNT(*) FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2 AND volume = 0;
""",
_safe_upper(symbol), interval
)
if zero_vol_count and zero_vol_count > 0:
health_issues.append({'severity': 'low', 'issue': 'zero_volume_candles',
'message': f'{zero_vol_count} candles with zero volume detected'})
# Score
health_score = 100
for issue in health_issues:
if issue['severity'] == 'high':
health_score -= 30
elif issue['severity'] == 'medium':
health_score -= 15
elif issue['severity'] == 'low':
health_score -= 5
health_score = max(0, health_score)
if health_score >= 90:
status = 'excellent'
elif health_score >= 70:
status = 'good'
elif health_score >= 50:
status = 'fair'
else:
status = 'poor'
return {
'symbol': _safe_upper(symbol),
'interval': interval,
'health_score': health_score,
'status': status,
'issues': health_issues,
'coverage_percent': coverage_pct,
'total_gaps': len(gaps),
'last_check': _utc_now().isoformat(),
}
async def get_detailed_statistics(self) -> Dict[str, Any]:
"""Get detailed database statistics for all symbols"""
if not self.pool:
return {'overall': {}, 'symbols': []}
async with self.pool.acquire() as conn:
overall = await conn.fetchrow(
"""
SELECT
COUNT(DISTINCT symbol) as total_symbols,
COUNT(*) as total_candles,
MIN(time) as first_record,
MAX(time) as last_record
FROM crypto_ohlcv;
"""
)
symbols = await conn.fetch(
"""
SELECT
symbol,
COUNT(DISTINCT interval) as intervals_count,
COUNT(*) as total_candles,
MIN(time) as first_record,
MAX(time) as last_record
FROM crypto_ohlcv
GROUP BY symbol
ORDER BY symbol;
"""
)
return {'overall': dict(overall) if overall else {}, 'symbols': [dict(r) for r in symbols]}
async def get_gap_fill_progress(self, symbol: str, interval: str) -> Dict[str, Any]:
"""
Get real-time progress of gap filling operations
"""
current = await self.get_data_coverage_summary(symbol, interval)
missing = current.get('missing_records', 0)
coverage = current.get('coverage_percent', 0.0)
avg_fill_rate = 100.0 # candles per minute (tunable)
estimated_minutes = (missing / avg_fill_rate) if avg_fill_rate > 0 else 0.0
return {
'symbol': _safe_upper(symbol),
'interval': interval,
'current_coverage': coverage,
'missing_records': missing,
'estimated_time_minutes': round(estimated_minutes, 1),
'estimated_time_human': self._format_duration(estimated_minutes),
'status': 'complete' if coverage >= 99.9 else 'in_progress',
}
def _format_duration(self, minutes: float) -> str:
"""Convert minutes to human-readable format"""
if minutes < 1:
return f"{int(minutes * 60)}s"
if minutes < 60:
return f"{int(minutes)}m"
if minutes < 1440:
hours = minutes / 60.0
return f"{hours:.1f}h"
days = minutes / 1440.0
return f"{days:.1f}d"
# --------------------------
# Stats and utilities
# --------------------------
async def get_total_records(self) -> int:
"""Get total number of records across all tables"""
if not self.pool:
return 0
async with self.pool.acquire() as conn:
result = await conn.fetchrow(
"""
SELECT
(SELECT COUNT(*) FROM crypto_ticks) +
(SELECT COUNT(*) FROM crypto_ohlcv) +
(SELECT COUNT(*) FROM technical_indicators) as total;
"""
)
return int(result['total']) if result else 0
async def get_last_update_time(self) -> str:
"""Get the last update time across all tables"""
if not self.pool:
return "Never"
async with self.pool.acquire() as conn:
result = await conn.fetchrow(
"""
SELECT MAX(last_update) as last_update FROM (
SELECT MAX(time) as last_update FROM crypto_ticks
UNION ALL
SELECT MAX(time) as last_update FROM crypto_ohlcv
UNION ALL
SELECT MAX(time) as last_update FROM technical_indicators
) sub;
"""
)
if result and result['last_update']:
return result['last_update'].isoformat()
return "Never"
async def get_symbol_statistics(self, symbol: str) -> Dict[str, Any]:
"""Get statistics for a specific symbol"""
if not self.pool:
return {}
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
stats = await conn.fetchrow(
"""
SELECT
(SELECT COUNT(*) FROM crypto_ticks WHERE symbol = $1) as tick_count,
(SELECT COUNT(*) FROM crypto_ohlcv WHERE symbol = $1) as ohlcv_count,
(SELECT COUNT(*) FROM technical_indicators WHERE symbol = $1) as indicator_count,
(SELECT MIN(time) FROM crypto_ohlcv WHERE symbol = $1) as first_record,
(SELECT MAX(time) FROM crypto_ohlcv WHERE symbol = $1) as last_record;
""",
symbol_u
)
return dict(stats) if stats else {}
async def get_all_symbols_summary(self) -> List[Dict[str, Any]]:
"""Get summary statistics for all symbols"""
if not self.pool:
return []
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT
symbol,
COUNT(DISTINCT interval) as intervals_count,
MIN(time) as first_record,
MAX(time) as last_record,
COUNT(*) as total_candles
FROM crypto_ohlcv
GROUP BY symbol
ORDER BY symbol;
"""
)
return [dict(row) for row in rows]
async def get_all_gaps_summary(self) -> List[Dict[str, Any]]:
"""Get gap summary for all symbols and intervals"""
symbols = await self.get_available_symbols()
intervals = ['1m', '5m', '15m', '1h', '4h', '1d']
summary: List[Dict[str, Any]] = []
for symbol in symbols:
for interval in intervals:
try:
gap_info = await self.detect_gaps(symbol, interval)
coverage = gap_info['coverage']
if coverage.get('total_records', 0) > 0:
summary.append({
'symbol': symbol,
'interval': interval,
'total_gaps': len(gap_info['gaps']),
'fillable_gaps': len([g for g in gap_info['gaps'] if g['missing_candles'] < 1000]),
'total_missing_candles': sum(g['missing_candles'] for g in gap_info['gaps']),
'coverage_percent': coverage['coverage_percent'],
})
except Exception as e:
self.logger.warning(f"Error getting gaps for {symbol} {interval}: {e}")
return summary
async def get_current_price_and_trends_with_volume(self, symbol: str) -> Optional[Dict[str, Any]]:
"""
Get current price, percentage changes, and robust 15m volume anomaly status.
Baseline blends last 4h, 24h, and 7d per-minute stats (excluding the most recent 15m).
"""
if not self.pool:
return None
symbol_u = _safe_upper(symbol)
now = _utc_now()
time_15m_ago = now - timedelta(minutes=15)
time_1h_ago = now - timedelta(hours=1)
time_1d_ago = now - timedelta(days=1)
time_1w_ago = now - timedelta(weeks=1)
baseline_end = time_15m_ago
start_4h = baseline_end - timedelta(hours=4)
start_24h = baseline_end - timedelta(hours=24)
start_7d = baseline_end - timedelta(days=7)
async with self.pool.acquire() as conn:
try:
current = await conn.fetchrow(
"""
SELECT close_price, volume, time
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1m'
ORDER BY time DESC
LIMIT 1;
""",
symbol_u
)
if not current:
return None
current_price = float(current['close_price'])
current_time = current['time']
# Price anchors
price_15m = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1m' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_15m_ago
)
price_1h = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1h' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_1h_ago
)
price_1d = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1h' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_1d_ago
)
price_1w = await conn.fetchrow(
"""
SELECT close_price FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1d' AND time <= $2
ORDER BY time DESC LIMIT 1;
""",
symbol_u, time_1w_ago
)
# Baselines and last 15m in one pass
stats = await conn.fetchrow(
"""
SELECT
COALESCE(SUM(volume) FILTER (WHERE time > $2 AND time <= $3), 0) AS vol_15m,
AVG(volume) FILTER (WHERE time > $4 AND time <= $2) AS avg_4h,
STDDEV_SAMP(volume) FILTER (WHERE time > $4 AND time <= $2) AS std_4h,
COUNT(*) FILTER (WHERE time > $4 AND time <= $2) AS n_4h,
AVG(volume) FILTER (WHERE time > $5 AND time <= $2) AS avg_24h,
STDDEV_SAMP(volume) FILTER (WHERE time > $5 AND time <= $2) AS std_24h,
COUNT(*) FILTER (WHERE time > $5 AND time <= $2) AS n_24h,
AVG(volume) FILTER (WHERE time > $6 AND time <= $2) AS avg_7d,
STDDEV_SAMP(volume) FILTER (WHERE time > $6 AND time <= $2) AS std_7d,
COUNT(*) FILTER (WHERE time > $6 AND time <= $2) AS n_7d
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = '1m';
""",
symbol_u, time_15m_ago, current_time, start_4h, start_24h, start_7d
)
last_15m_volume = float(stats['vol_15m']) if stats and stats['vol_15m'] is not None else 0.0
def _to_float(x):
return float(x) if x is not None else None
avg4 = _to_float(stats['avg_4h']); std4 = _to_float(stats['std_4h']); n4 = int(stats['n_4h']) if stats and stats['n_4h'] is not None else 0
avg24 = _to_float(stats['avg_24h']); std24 = _to_float(stats['std_24h']); n24 = int(stats['n_24h']) if stats and stats['n_24h'] is not None else 0
avg7 = _to_float(stats['avg_7d']); std7 = _to_float(stats['std_7d']); n7 = int(stats['n_7d']) if stats and stats['n_7d'] is not None else 0
import math
def scale_to_15m(avg_per_min, std_per_min):
mean15 = (avg_per_min or 0.0) * 15.0
std15 = (std_per_min or 0.0) * math.sqrt(15.0)
return mean15, std15
mean15_4, std15_4 = scale_to_15m(avg4, std4)
mean15_24, std15_24 = scale_to_15m(avg24, std24)
mean15_7, std15_7 = scale_to_15m(avg7, std7)
# Weights gated by data availability
w4 = 0.6 if n4 >= 60 else 0.0 # at least 1 hour of 1m bars
w24 = 0.3 if n24 >= 240 else 0.0 # at least 4 hours of 1m bars
w7 = 0.1 if n7 >= 2000 else 0.0 # ~>1.4 days of 1m bars
total_w = w4 + w24 + w7
if total_w == 0.0:
candidates = []
if n4 > 0: candidates.append((mean15_4, std15_4, 1.0))
if n24 > 0: candidates.append((mean15_24, std15_24, 1.0))
if n7 > 0: candidates.append((mean15_7, std15_7, 1.0))
total_w = sum(w for _, _, w in candidates) or 1.0
blended_mean15 = sum(m * w for m, _, w in candidates) / total_w
blended_var15 = sum(((s or 0.0) ** 2) * w for _, s, w in candidates) / total_w
else:
blended_mean15 = (mean15_4 * w4 + mean15_24 * w24 + mean15_7 * w7) / total_w
blended_var15 = (((std15_4 or 0.0) ** 2) * w4 +
((std15_24 or 0.0) ** 2) * w24 +
((std15_7 or 0.0) ** 2) * w7) / total_w
blended_std15 = math.sqrt(blended_var15) if blended_var15 > 0 else 0.0
deviation_pct = 0.0
zscore = 0.0
if blended_mean15 > 0:
deviation_pct = ((last_15m_volume - blended_mean15) / blended_mean15) * 100.0
if blended_std15 > 0:
zscore = (last_15m_volume - blended_mean15) / blended_std15
volume_status = "Average"
if blended_mean15 <= 0 and last_15m_volume > 0:
volume_status = "Unusually High"
else:
if zscore >= 2.5 or deviation_pct >= 50.0:
volume_status = "Unusually High"
elif zscore >= 1.5 or deviation_pct >= 20.0:
volume_status = "High"
elif zscore <= -2.5 or deviation_pct <= -50.0:
volume_status = "Unusually Low"
elif zscore <= -1.5 or deviation_pct <= -20.0:
volume_status = "Low"
current_volume = float(current['volume'])
def calc_change(old):
if old and float(old) > 0:
return round(((current_price - float(old)) / float(old)) * 100.0, 2)
return 0.0
return {
'symbol': symbol_u,
'current_price': current_price,
'current_volume': current_volume,
'volume_status': volume_status,
'last_update': current_time.isoformat(),
'trends': {
'15m': calc_change(price_15m['close_price'] if price_15m else None),
'1h': calc_change(price_1h['close_price'] if price_1h else None),
'1d': calc_change(price_1d['close_price'] if price_1d else None),
'1w': calc_change(price_1w['close_price'] if price_1w else None),
},
'volume_context': {
'last_15m_volume': round(last_15m_volume, 8),
'expected_15m_volume': round(blended_mean15, 8),
'zscore': round(zscore, 3),
'deviation_pct': round(deviation_pct, 2),
'baselines': {'n_4h': n4, 'n_24h': n24, 'n_7d': n7}
}
}
except Exception as e:
self.logger.error(f"Error getting price trends with volume for {symbol_u}: {e}", exc_info=True)
return None
async def get_gap_fill_status(self, symbol: str, interval: str) -> Dict[str, Any]:
"""Get gap fill status for a symbol and interval"""
gaps = await self.detect_gaps(symbol, interval)
return {'symbol': _safe_upper(symbol), 'interval': interval, 'gaps': gaps['gaps'], 'coverage': gaps['coverage']}
async def fill_genuine_gaps_with_averages(self, symbol: str, interval: str,
max_consecutive: int = 5, lookback: int = 10) -> int:
"""Fill genuine empty gaps with intelligent averaging"""
self.logger.info(f"Filling genuine gaps for {symbol} {interval}")
gaps_info = await self.detect_gaps(symbol, interval)
gaps = gaps_info.get("gaps", [])
if not gaps:
return 0
filled_count = 0
for gap in gaps:
gap_start = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_start']))
gap_end = _ensure_dt_aware_utc(datetime.fromisoformat(gap['gap_end']))
missing_candles = gap['missing_candles']
if missing_candles > max_consecutive:
continue
try:
filled = await self._fill_gap_with_averaging(symbol, interval, gap_start, gap_end, missing_candles)
filled_count += filled
except Exception as e:
self.logger.error(f"Error filling gap with averaging: {e}")
continue
return filled_count
async def cleanup_old_data(self, retention_days: int = 365):
"""Clean up old data based on retention policy"""
if not self.pool:
return
cutoff_date = _utc_now() - timedelta(days=retention_days)
tick_cutoff = _utc_now() - timedelta(days=min(retention_days, 30))
async with self.pool.acquire() as conn:
await conn.execute("DELETE FROM crypto_ticks WHERE time < $1;", tick_cutoff)
await conn.execute("DELETE FROM crypto_ohlcv WHERE time < $1;", cutoff_date)
await conn.execute("DELETE FROM technical_indicators WHERE time < $1;", cutoff_date)
self.logger.info(f"Cleaned up data older than {retention_days} days")
async def vacuum_analyze(self):
"""Perform database maintenance"""
if not self.pool:
return
async with self.pool.acquire() as conn:
tables = ['crypto_ticks', 'crypto_ohlcv', 'technical_indicators']
for t in tables:
try:
await conn.execute(f"VACUUM ANALYZE {t};")
except Exception as e:
self.logger.warning(f"VACUUM ANALYZE warning on {t}: {e}")
self.logger.info("Database vacuum and analyze completed")
@asynccontextmanager
async def transaction(self):
"""Context manager for database transactions"""
if not self.pool:
raise RuntimeError("Pool not initialized")
async with self.pool.acquire() as conn:
async with conn.transaction():
yield conn
# --------------------------
# Indicator Coverage Methods
# --------------------------
REQUIRED_INDICATORS = [
'adx_14', 'atr_14', 'bb_lower', 'bb_middle', 'bb_upper',
'ema_12', 'ema_26', 'macd_histogram', 'macd_line', 'macd_signal',
'rsi_14', 'sma_20', 'sma_200', 'sma_50', 'stoch_d', 'stoch_k'
]
async def check_indicator_coverage(self, symbol: str, interval: str) -> Dict[str, Any]:
"""
Check indicator coverage for a symbol/interval.
Returns detailed coverage statistics showing which OHLCV records are missing indicators.
"""
if not self.pool:
return {'total_ohlcv': 0, 'complete_records': 0, 'coverage_percent': 0.0}
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
# Get total OHLCV records
total_result = await conn.fetchrow(
"""
SELECT COUNT(*) as total
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2;
""",
symbol_u, interval
)
total_ohlcv = int(total_result['total']) if total_result else 0
if total_ohlcv == 0:
return {
'symbol': symbol_u,
'interval': interval,
'total_ohlcv': 0,
'complete_records': 0,
'incomplete_records': 0,
'coverage_percent': 0.0,
'missing_indicators': [],
}
# Count OHLCV records that have ALL 16 required indicators
complete_result = await conn.fetchrow(
"""
WITH ohlcv_times AS (
SELECT time
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
),
indicator_counts AS (
SELECT
o.time,
COUNT(DISTINCT ti.indicator_name) as indicator_count
FROM ohlcv_times o
LEFT JOIN technical_indicators ti ON
ti.time = o.time AND
ti.symbol = $1 AND
ti.interval = $2
GROUP BY o.time
)
SELECT
COUNT(*) FILTER (WHERE indicator_count = $3) as complete_count,
COUNT(*) FILTER (WHERE indicator_count < $3) as incomplete_count
FROM indicator_counts;
""",
symbol_u, interval, len(self.REQUIRED_INDICATORS)
)
complete_count = int(complete_result['complete_count']) if complete_result else 0
incomplete_count = int(complete_result['incomplete_count']) if complete_result else total_ohlcv
coverage_percent = (complete_count / total_ohlcv * 100) if total_ohlcv > 0 else 0.0
# Get list of indicators with missing coverage
missing_indicators_result = await conn.fetch(
"""
WITH ohlcv_count AS (
SELECT COUNT(*) as total
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
)
SELECT
indicator_name,
COUNT(*) as present_count,
(SELECT total FROM ohlcv_count) as total_ohlcv,
ROUND(COUNT(*)::decimal / (SELECT total FROM ohlcv_count) * 100, 2) as coverage_pct
FROM technical_indicators
WHERE symbol = $1 AND interval = $2
GROUP BY indicator_name
ORDER BY coverage_pct ASC;
""",
symbol_u, interval
)
missing_indicators = []
present_indicators = {row['indicator_name']: {
'present': int(row['present_count']),
'missing': total_ohlcv - int(row['present_count']),
'coverage_pct': float(row['coverage_pct'])
} for row in missing_indicators_result}
for req_ind in self.REQUIRED_INDICATORS:
if req_ind not in present_indicators:
missing_indicators.append({
'indicator': req_ind,
'present': 0,
'missing': total_ohlcv,
'coverage_pct': 0.0
})
elif present_indicators[req_ind]['missing'] > 0:
missing_indicators.append({
'indicator': req_ind,
'present': present_indicators[req_ind]['present'],
'missing': present_indicators[req_ind]['missing'],
'coverage_pct': present_indicators[req_ind]['coverage_pct']
})
return {
'symbol': symbol_u,
'interval': interval,
'total_ohlcv': total_ohlcv,
'complete_records': complete_count,
'incomplete_records': incomplete_count,
'coverage_percent': round(coverage_percent, 2),
'missing_indicators': missing_indicators,
'required_indicators': len(self.REQUIRED_INDICATORS)
}
async def get_ohlcv_missing_indicators(self, symbol: str, interval: str,
limit: int = 1000) -> List[Dict[str, Any]]:
"""
Get OHLCV records that are missing technical indicators.
Returns time ranges that need indicator calculation.
"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
WITH ohlcv_with_counts AS (
SELECT
o.time,
COUNT(DISTINCT ti.indicator_name) as indicator_count
FROM crypto_ohlcv o
LEFT JOIN technical_indicators ti ON
ti.time = o.time AND
ti.symbol = o.symbol AND
ti.interval = o.interval
WHERE o.symbol = $1 AND o.interval = $2
GROUP BY o.time
HAVING COUNT(DISTINCT ti.indicator_name) < $3
ORDER BY o.time DESC
LIMIT $4
)
SELECT time, indicator_count
FROM ohlcv_with_counts;
""",
symbol_u, interval, len(self.REQUIRED_INDICATORS), limit
)
return [{'time': row['time'].isoformat(),
'existing_indicators': int(row['indicator_count'])} for row in rows]
async def backfill_missing_indicators(self, symbol: str, interval: str,
batch_size: int = 200) -> Dict[str, Any]:
"""
Backfill missing technical indicators for existing OHLCV records.
Processes in batches to avoid memory issues.
"""
self.logger.info(f"Starting indicator backfill for {symbol} {interval}")
symbol_u = _safe_upper(symbol)
total_processed = 0
total_indicators_added = 0
try:
# Get coverage before backfill
coverage_before = await self.check_indicator_coverage(symbol_u, interval)
if coverage_before['coverage_percent'] >= 99.9:
return {
'status': 'success',
'message': 'Indicator coverage already complete',
'records_processed': 0,
'indicators_added': 0,
'coverage_before': coverage_before['coverage_percent'],
'coverage_after': coverage_before['coverage_percent']
}
# Process in batches
while True:
# Get batch of OHLCV records needing indicators
missing_times = await self.get_ohlcv_missing_indicators(
symbol_u, interval, limit=batch_size
)
if not missing_times:
break
# Fetch OHLCV data for calculation (need history for indicators)
# Get earliest time in batch and fetch enough history
earliest_time = min(datetime.fromisoformat(t['time']) for t in missing_times)
# Fetch sufficient historical data (250 candles for SMA-200)
lookback_time = earliest_time - timedelta(
seconds=self._interval_to_seconds(interval) * 250
)
ohlcv_data = await self.get_ohlcv_data_range(
symbol_u, interval, lookback_time,
datetime.fromisoformat(missing_times[0]['time'])
)
if len(ohlcv_data) < 50:
self.logger.warning(
f"Insufficient data for indicators: {len(ohlcv_data)} records"
)
break
# Calculate indicators using utils
from utils import calculate_technical_indicators, load_config
import pandas as pd
config = load_config()
indicator_config = config.get('technical_indicators', {})
df = pd.DataFrame(ohlcv_data)
df['time'] = pd.to_datetime(df['time'])
df = df.sort_values('time')
df.set_index('time', inplace=True)
# Rename for pandas_ta
df = df.rename(columns={
'open_price': 'open',
'high_price': 'high',
'low_price': 'low',
'close_price': 'close'
})
indicators_data = calculate_technical_indicators(df, indicator_config)
if indicators_data:
# Filter to only missing times
missing_time_set = {t['time'] for t in missing_times}
filtered_indicators = [
ind for ind in indicators_data
if ind['time'].isoformat() in missing_time_set
]
if filtered_indicators:
await self.insert_indicators_batch(symbol_u, interval, filtered_indicators)
total_indicators_added += len(filtered_indicators)
total_processed += len(missing_times)
self.logger.info(
f"Backfilled {len(filtered_indicators)} indicators for "
f"{len(missing_times)} OHLCV records"
)
# Small delay to avoid overwhelming the database
await asyncio.sleep(0.1)
# Safety check: if we've processed many batches, break
if total_processed >= 10000:
self.logger.warning("Processed 10k+ records, stopping this batch")
break
# Get coverage after backfill
coverage_after = await self.check_indicator_coverage(symbol_u, interval)
return {
'status': 'success',
'symbol': symbol_u,
'interval': interval,
'records_processed': total_processed,
'indicators_added': total_indicators_added,
'coverage_before': coverage_before['coverage_percent'],
'coverage_after': coverage_after['coverage_percent'],
'improvement': round(coverage_after['coverage_percent'] -
coverage_before['coverage_percent'], 2)
}
except Exception as e:
self.logger.error(f"Error backfilling indicators: {e}", exc_info=True)
return {
'status': 'error',
'error': str(e),
'records_processed': total_processed,
'indicators_added': total_indicators_added
}
async def get_ohlcv_data_range(self, symbol: str, interval: str,
start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
"""Get OHLCV data for a specific time range (for indicator calculation)"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
start_u = _ensure_dt_aware_utc(start_time)
end_u = _ensure_dt_aware_utc(end_time)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT time, symbol, exchange, interval, open_price, high_price,
low_price, close_price, volume, quote_volume, trade_count
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
AND time >= $3 AND time <= $4
ORDER BY time ASC;
""",
symbol_u, interval, start_u, end_u
)
return [dict(row) for row in rows]
async def get_all_indicator_coverage_status(self) -> List[Dict[str, Any]]:
"""Get indicator coverage status for all symbol/interval combinations"""
symbols = await self.get_available_symbols()
intervals = ['1m', '5m', '15m', '1h', '4h', '1d']
results = []
for symbol in symbols:
for interval in intervals:
try:
coverage = await self.check_indicator_coverage(symbol, interval)
if coverage['total_ohlcv'] > 0:
results.append(coverage)
except Exception as e:
self.logger.error(f"Error checking coverage for {symbol} {interval}: {e}")
return results
async def close(self):
"""Close database connection pool"""
if self.pool:
await self.pool.close()
self.logger.info("Database connection pool closed")
# Utility function to create the database if it does not exist
async def create_database_if_not_exists():
"""Create database if it doesn't exist (connect via postgres db)"""
conn = await asyncpg.connect(
host=os.getenv('DB_HOST', 'localhost'),
port=int(os.getenv('DB_PORT', 5432)),
database='postgres',
user=os.getenv('DB_USER', 'postgres'),
password=os.getenv('DB_PASSWORD', 'password'),
)
db_name = os.getenv('DB_NAME', 'crypto_trading')
try:
exists = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname = $1", db_name)
if not exists:
await conn.execute(f'CREATE DATABASE "{db_name}";')
print(f"Database '{db_name}' created successfully")
else:
print(f"Database '{db_name}' already exists")
except Exception as e:
print(f"Error creating database: {e}")
finally:
await conn.close()
if __name__ == "__main__":
async def test_db():
await create_database_if_not_exists()
db = DatabaseManager()
await db.initialize()
stats = await db.get_total_records()
print(f"Total records: {stats}")
await db.close()
asyncio.run(test_db())