feat: Add 100% technical indicator coverage tracking and backfilling
- Add check_indicator_coverage() to detect OHLCV records missing indicators - Add get_ohlcv_missing_indicators() to identify specific gaps - Add backfill_missing_indicators() to fill missing indicator data - Add get_ohlcv_data_range() helper for fetching historical data - Add get_all_indicator_coverage_status() for system-wide monitoring - Define REQUIRED_INDICATORS constant with all 16 required indicators - Process backfills in configurable batches to manage memory - Calculate indicators using existing utils.calculate_technical_indicators() - Track coverage statistics before/after backfill operations - Support for automated indicator completeness verification This ensures every crypto_ohlcv record has all 16 technical indicators (adx_14, atr_14, bb_lower, bb_middle, bb_upper, ema_12, ema_26, macd_histogram, macd_line, macd_signal, rsi_14, sma_20, sma_200, sma_50, stoch_d, stoch_k) calculated and stored
This commit is contained in:
338
db.py
338
db.py
@@ -1635,6 +1635,343 @@ class DatabaseManager:
|
||||
async with conn.transaction():
|
||||
yield conn
|
||||
|
||||
# --------------------------
|
||||
# Indicator Coverage Methods
|
||||
# --------------------------
|
||||
|
||||
REQUIRED_INDICATORS = [
|
||||
'adx_14', 'atr_14', 'bb_lower', 'bb_middle', 'bb_upper',
|
||||
'ema_12', 'ema_26', 'macd_histogram', 'macd_line', 'macd_signal',
|
||||
'rsi_14', 'sma_20', 'sma_200', 'sma_50', 'stoch_d', 'stoch_k'
|
||||
]
|
||||
|
||||
async def check_indicator_coverage(self, symbol: str, interval: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Check indicator coverage for a symbol/interval.
|
||||
Returns detailed coverage statistics showing which OHLCV records are missing indicators.
|
||||
"""
|
||||
if not self.pool:
|
||||
return {'total_ohlcv': 0, 'complete_records': 0, 'coverage_percent': 0.0}
|
||||
|
||||
symbol_u = _safe_upper(symbol)
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
# Get total OHLCV records
|
||||
total_result = await conn.fetchrow(
|
||||
"""
|
||||
SELECT COUNT(*) as total
|
||||
FROM crypto_ohlcv
|
||||
WHERE symbol = $1 AND interval = $2;
|
||||
""",
|
||||
symbol_u, interval
|
||||
)
|
||||
total_ohlcv = int(total_result['total']) if total_result else 0
|
||||
|
||||
if total_ohlcv == 0:
|
||||
return {
|
||||
'symbol': symbol_u,
|
||||
'interval': interval,
|
||||
'total_ohlcv': 0,
|
||||
'complete_records': 0,
|
||||
'incomplete_records': 0,
|
||||
'coverage_percent': 0.0,
|
||||
'missing_indicators': [],
|
||||
}
|
||||
|
||||
# Count OHLCV records that have ALL 16 required indicators
|
||||
complete_result = await conn.fetchrow(
|
||||
"""
|
||||
WITH ohlcv_times AS (
|
||||
SELECT time
|
||||
FROM crypto_ohlcv
|
||||
WHERE symbol = $1 AND interval = $2
|
||||
),
|
||||
indicator_counts AS (
|
||||
SELECT
|
||||
o.time,
|
||||
COUNT(DISTINCT ti.indicator_name) as indicator_count
|
||||
FROM ohlcv_times o
|
||||
LEFT JOIN technical_indicators ti ON
|
||||
ti.time = o.time AND
|
||||
ti.symbol = $1 AND
|
||||
ti.interval = $2
|
||||
GROUP BY o.time
|
||||
)
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE indicator_count = $3) as complete_count,
|
||||
COUNT(*) FILTER (WHERE indicator_count < $3) as incomplete_count
|
||||
FROM indicator_counts;
|
||||
""",
|
||||
symbol_u, interval, len(self.REQUIRED_INDICATORS)
|
||||
)
|
||||
|
||||
complete_count = int(complete_result['complete_count']) if complete_result else 0
|
||||
incomplete_count = int(complete_result['incomplete_count']) if complete_result else total_ohlcv
|
||||
coverage_percent = (complete_count / total_ohlcv * 100) if total_ohlcv > 0 else 0.0
|
||||
|
||||
# Get list of indicators with missing coverage
|
||||
missing_indicators_result = await conn.fetch(
|
||||
"""
|
||||
WITH ohlcv_count AS (
|
||||
SELECT COUNT(*) as total
|
||||
FROM crypto_ohlcv
|
||||
WHERE symbol = $1 AND interval = $2
|
||||
)
|
||||
SELECT
|
||||
indicator_name,
|
||||
COUNT(*) as present_count,
|
||||
(SELECT total FROM ohlcv_count) as total_ohlcv,
|
||||
ROUND(COUNT(*)::decimal / (SELECT total FROM ohlcv_count) * 100, 2) as coverage_pct
|
||||
FROM technical_indicators
|
||||
WHERE symbol = $1 AND interval = $2
|
||||
GROUP BY indicator_name
|
||||
ORDER BY coverage_pct ASC;
|
||||
""",
|
||||
symbol_u, interval
|
||||
)
|
||||
|
||||
missing_indicators = []
|
||||
present_indicators = {row['indicator_name']: {
|
||||
'present': int(row['present_count']),
|
||||
'missing': total_ohlcv - int(row['present_count']),
|
||||
'coverage_pct': float(row['coverage_pct'])
|
||||
} for row in missing_indicators_result}
|
||||
|
||||
for req_ind in self.REQUIRED_INDICATORS:
|
||||
if req_ind not in present_indicators:
|
||||
missing_indicators.append({
|
||||
'indicator': req_ind,
|
||||
'present': 0,
|
||||
'missing': total_ohlcv,
|
||||
'coverage_pct': 0.0
|
||||
})
|
||||
elif present_indicators[req_ind]['missing'] > 0:
|
||||
missing_indicators.append({
|
||||
'indicator': req_ind,
|
||||
'present': present_indicators[req_ind]['present'],
|
||||
'missing': present_indicators[req_ind]['missing'],
|
||||
'coverage_pct': present_indicators[req_ind]['coverage_pct']
|
||||
})
|
||||
|
||||
return {
|
||||
'symbol': symbol_u,
|
||||
'interval': interval,
|
||||
'total_ohlcv': total_ohlcv,
|
||||
'complete_records': complete_count,
|
||||
'incomplete_records': incomplete_count,
|
||||
'coverage_percent': round(coverage_percent, 2),
|
||||
'missing_indicators': missing_indicators,
|
||||
'required_indicators': len(self.REQUIRED_INDICATORS)
|
||||
}
|
||||
|
||||
async def get_ohlcv_missing_indicators(self, symbol: str, interval: str,
|
||||
limit: int = 1000) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get OHLCV records that are missing technical indicators.
|
||||
Returns time ranges that need indicator calculation.
|
||||
"""
|
||||
if not self.pool:
|
||||
return []
|
||||
|
||||
symbol_u = _safe_upper(symbol)
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
WITH ohlcv_with_counts AS (
|
||||
SELECT
|
||||
o.time,
|
||||
COUNT(DISTINCT ti.indicator_name) as indicator_count
|
||||
FROM crypto_ohlcv o
|
||||
LEFT JOIN technical_indicators ti ON
|
||||
ti.time = o.time AND
|
||||
ti.symbol = o.symbol AND
|
||||
ti.interval = o.interval
|
||||
WHERE o.symbol = $1 AND o.interval = $2
|
||||
GROUP BY o.time
|
||||
HAVING COUNT(DISTINCT ti.indicator_name) < $3
|
||||
ORDER BY o.time DESC
|
||||
LIMIT $4
|
||||
)
|
||||
SELECT time, indicator_count
|
||||
FROM ohlcv_with_counts;
|
||||
""",
|
||||
symbol_u, interval, len(self.REQUIRED_INDICATORS), limit
|
||||
)
|
||||
|
||||
return [{'time': row['time'].isoformat(),
|
||||
'existing_indicators': int(row['indicator_count'])} for row in rows]
|
||||
|
||||
async def backfill_missing_indicators(self, symbol: str, interval: str,
|
||||
batch_size: int = 200) -> Dict[str, Any]:
|
||||
"""
|
||||
Backfill missing technical indicators for existing OHLCV records.
|
||||
Processes in batches to avoid memory issues.
|
||||
"""
|
||||
self.logger.info(f"Starting indicator backfill for {symbol} {interval}")
|
||||
|
||||
symbol_u = _safe_upper(symbol)
|
||||
total_processed = 0
|
||||
total_indicators_added = 0
|
||||
|
||||
try:
|
||||
# Get coverage before backfill
|
||||
coverage_before = await self.check_indicator_coverage(symbol_u, interval)
|
||||
|
||||
if coverage_before['coverage_percent'] >= 99.9:
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': 'Indicator coverage already complete',
|
||||
'records_processed': 0,
|
||||
'indicators_added': 0,
|
||||
'coverage_before': coverage_before['coverage_percent'],
|
||||
'coverage_after': coverage_before['coverage_percent']
|
||||
}
|
||||
|
||||
# Process in batches
|
||||
while True:
|
||||
# Get batch of OHLCV records needing indicators
|
||||
missing_times = await self.get_ohlcv_missing_indicators(
|
||||
symbol_u, interval, limit=batch_size
|
||||
)
|
||||
|
||||
if not missing_times:
|
||||
break
|
||||
|
||||
# Fetch OHLCV data for calculation (need history for indicators)
|
||||
# Get earliest time in batch and fetch enough history
|
||||
earliest_time = min(datetime.fromisoformat(t['time']) for t in missing_times)
|
||||
|
||||
# Fetch sufficient historical data (250 candles for SMA-200)
|
||||
lookback_time = earliest_time - timedelta(
|
||||
seconds=self._interval_to_seconds(interval) * 250
|
||||
)
|
||||
|
||||
ohlcv_data = await self.get_ohlcv_data_range(
|
||||
symbol_u, interval, lookback_time,
|
||||
datetime.fromisoformat(missing_times[0]['time'])
|
||||
)
|
||||
|
||||
if len(ohlcv_data) < 50:
|
||||
self.logger.warning(
|
||||
f"Insufficient data for indicators: {len(ohlcv_data)} records"
|
||||
)
|
||||
break
|
||||
|
||||
# Calculate indicators using utils
|
||||
from utils import calculate_technical_indicators, load_config
|
||||
import pandas as pd
|
||||
|
||||
config = load_config()
|
||||
indicator_config = config.get('technical_indicators', {})
|
||||
|
||||
df = pd.DataFrame(ohlcv_data)
|
||||
df['time'] = pd.to_datetime(df['time'])
|
||||
df = df.sort_values('time')
|
||||
df.set_index('time', inplace=True)
|
||||
|
||||
# Rename for pandas_ta
|
||||
df = df.rename(columns={
|
||||
'open_price': 'open',
|
||||
'high_price': 'high',
|
||||
'low_price': 'low',
|
||||
'close_price': 'close'
|
||||
})
|
||||
|
||||
indicators_data = calculate_technical_indicators(df, indicator_config)
|
||||
|
||||
if indicators_data:
|
||||
# Filter to only missing times
|
||||
missing_time_set = {t['time'] for t in missing_times}
|
||||
filtered_indicators = [
|
||||
ind for ind in indicators_data
|
||||
if ind['time'].isoformat() in missing_time_set
|
||||
]
|
||||
|
||||
if filtered_indicators:
|
||||
await self.insert_indicators_batch(symbol_u, interval, filtered_indicators)
|
||||
total_indicators_added += len(filtered_indicators)
|
||||
total_processed += len(missing_times)
|
||||
|
||||
self.logger.info(
|
||||
f"Backfilled {len(filtered_indicators)} indicators for "
|
||||
f"{len(missing_times)} OHLCV records"
|
||||
)
|
||||
|
||||
# Small delay to avoid overwhelming the database
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Safety check: if we've processed many batches, break
|
||||
if total_processed >= 10000:
|
||||
self.logger.warning("Processed 10k+ records, stopping this batch")
|
||||
break
|
||||
|
||||
# Get coverage after backfill
|
||||
coverage_after = await self.check_indicator_coverage(symbol_u, interval)
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'symbol': symbol_u,
|
||||
'interval': interval,
|
||||
'records_processed': total_processed,
|
||||
'indicators_added': total_indicators_added,
|
||||
'coverage_before': coverage_before['coverage_percent'],
|
||||
'coverage_after': coverage_after['coverage_percent'],
|
||||
'improvement': round(coverage_after['coverage_percent'] -
|
||||
coverage_before['coverage_percent'], 2)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error backfilling indicators: {e}", exc_info=True)
|
||||
return {
|
||||
'status': 'error',
|
||||
'error': str(e),
|
||||
'records_processed': total_processed,
|
||||
'indicators_added': total_indicators_added
|
||||
}
|
||||
|
||||
async def get_ohlcv_data_range(self, symbol: str, interval: str,
|
||||
start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
|
||||
"""Get OHLCV data for a specific time range (for indicator calculation)"""
|
||||
if not self.pool:
|
||||
return []
|
||||
|
||||
symbol_u = _safe_upper(symbol)
|
||||
start_u = _ensure_dt_aware_utc(start_time)
|
||||
end_u = _ensure_dt_aware_utc(end_time)
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""
|
||||
SELECT time, symbol, exchange, interval, open_price, high_price,
|
||||
low_price, close_price, volume, quote_volume, trade_count
|
||||
FROM crypto_ohlcv
|
||||
WHERE symbol = $1 AND interval = $2
|
||||
AND time >= $3 AND time <= $4
|
||||
ORDER BY time ASC;
|
||||
""",
|
||||
symbol_u, interval, start_u, end_u
|
||||
)
|
||||
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
async def get_all_indicator_coverage_status(self) -> List[Dict[str, Any]]:
|
||||
"""Get indicator coverage status for all symbol/interval combinations"""
|
||||
symbols = await self.get_available_symbols()
|
||||
intervals = ['1m', '5m', '15m', '1h', '4h', '1d']
|
||||
|
||||
results = []
|
||||
for symbol in symbols:
|
||||
for interval in intervals:
|
||||
try:
|
||||
coverage = await self.check_indicator_coverage(symbol, interval)
|
||||
if coverage['total_ohlcv'] > 0:
|
||||
results.append(coverage)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error checking coverage for {symbol} {interval}: {e}")
|
||||
|
||||
return results
|
||||
|
||||
async def close(self):
|
||||
"""Close database connection pool"""
|
||||
if self.pool:
|
||||
@@ -1665,7 +2002,6 @@ async def create_database_if_not_exists():
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
async def test_db():
|
||||
await create_database_if_not_exists()
|
||||
|
Reference in New Issue
Block a user