From 2708bcb1765ae7a80ac94796235476be75712aa9 Mon Sep 17 00:00:00 2001 From: lewismac Date: Thu, 9 Oct 2025 08:46:25 +0100 Subject: [PATCH] feat: Add 100% technical indicator coverage tracking and backfilling - Add check_indicator_coverage() to detect OHLCV records missing indicators - Add get_ohlcv_missing_indicators() to identify specific gaps - Add backfill_missing_indicators() to fill missing indicator data - Add get_ohlcv_data_range() helper for fetching historical data - Add get_all_indicator_coverage_status() for system-wide monitoring - Define REQUIRED_INDICATORS constant with all 16 required indicators - Process backfills in configurable batches to manage memory - Calculate indicators using existing utils.calculate_technical_indicators() - Track coverage statistics before/after backfill operations - Support for automated indicator completeness verification This ensures every crypto_ohlcv record has all 16 technical indicators (adx_14, atr_14, bb_lower, bb_middle, bb_upper, ema_12, ema_26, macd_histogram, macd_line, macd_signal, rsi_14, sma_20, sma_200, sma_50, stoch_d, stoch_k) calculated and stored --- db.py | 338 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 337 insertions(+), 1 deletion(-) diff --git a/db.py b/db.py index 520657e..12fb81d 100644 --- a/db.py +++ b/db.py @@ -1635,6 +1635,343 @@ class DatabaseManager: async with conn.transaction(): yield conn + # -------------------------- + # Indicator Coverage Methods + # -------------------------- + + REQUIRED_INDICATORS = [ + 'adx_14', 'atr_14', 'bb_lower', 'bb_middle', 'bb_upper', + 'ema_12', 'ema_26', 'macd_histogram', 'macd_line', 'macd_signal', + 'rsi_14', 'sma_20', 'sma_200', 'sma_50', 'stoch_d', 'stoch_k' + ] + + async def check_indicator_coverage(self, symbol: str, interval: str) -> Dict[str, Any]: + """ + Check indicator coverage for a symbol/interval. + Returns detailed coverage statistics showing which OHLCV records are missing indicators. + """ + if not self.pool: + return {'total_ohlcv': 0, 'complete_records': 0, 'coverage_percent': 0.0} + + symbol_u = _safe_upper(symbol) + + async with self.pool.acquire() as conn: + # Get total OHLCV records + total_result = await conn.fetchrow( + """ + SELECT COUNT(*) as total + FROM crypto_ohlcv + WHERE symbol = $1 AND interval = $2; + """, + symbol_u, interval + ) + total_ohlcv = int(total_result['total']) if total_result else 0 + + if total_ohlcv == 0: + return { + 'symbol': symbol_u, + 'interval': interval, + 'total_ohlcv': 0, + 'complete_records': 0, + 'incomplete_records': 0, + 'coverage_percent': 0.0, + 'missing_indicators': [], + } + + # Count OHLCV records that have ALL 16 required indicators + complete_result = await conn.fetchrow( + """ + WITH ohlcv_times AS ( + SELECT time + FROM crypto_ohlcv + WHERE symbol = $1 AND interval = $2 + ), + indicator_counts AS ( + SELECT + o.time, + COUNT(DISTINCT ti.indicator_name) as indicator_count + FROM ohlcv_times o + LEFT JOIN technical_indicators ti ON + ti.time = o.time AND + ti.symbol = $1 AND + ti.interval = $2 + GROUP BY o.time + ) + SELECT + COUNT(*) FILTER (WHERE indicator_count = $3) as complete_count, + COUNT(*) FILTER (WHERE indicator_count < $3) as incomplete_count + FROM indicator_counts; + """, + symbol_u, interval, len(self.REQUIRED_INDICATORS) + ) + + complete_count = int(complete_result['complete_count']) if complete_result else 0 + incomplete_count = int(complete_result['incomplete_count']) if complete_result else total_ohlcv + coverage_percent = (complete_count / total_ohlcv * 100) if total_ohlcv > 0 else 0.0 + + # Get list of indicators with missing coverage + missing_indicators_result = await conn.fetch( + """ + WITH ohlcv_count AS ( + SELECT COUNT(*) as total + FROM crypto_ohlcv + WHERE symbol = $1 AND interval = $2 + ) + SELECT + indicator_name, + COUNT(*) as present_count, + (SELECT total FROM ohlcv_count) as total_ohlcv, + ROUND(COUNT(*)::decimal / (SELECT total FROM ohlcv_count) * 100, 2) as coverage_pct + FROM technical_indicators + WHERE symbol = $1 AND interval = $2 + GROUP BY indicator_name + ORDER BY coverage_pct ASC; + """, + symbol_u, interval + ) + + missing_indicators = [] + present_indicators = {row['indicator_name']: { + 'present': int(row['present_count']), + 'missing': total_ohlcv - int(row['present_count']), + 'coverage_pct': float(row['coverage_pct']) + } for row in missing_indicators_result} + + for req_ind in self.REQUIRED_INDICATORS: + if req_ind not in present_indicators: + missing_indicators.append({ + 'indicator': req_ind, + 'present': 0, + 'missing': total_ohlcv, + 'coverage_pct': 0.0 + }) + elif present_indicators[req_ind]['missing'] > 0: + missing_indicators.append({ + 'indicator': req_ind, + 'present': present_indicators[req_ind]['present'], + 'missing': present_indicators[req_ind]['missing'], + 'coverage_pct': present_indicators[req_ind]['coverage_pct'] + }) + + return { + 'symbol': symbol_u, + 'interval': interval, + 'total_ohlcv': total_ohlcv, + 'complete_records': complete_count, + 'incomplete_records': incomplete_count, + 'coverage_percent': round(coverage_percent, 2), + 'missing_indicators': missing_indicators, + 'required_indicators': len(self.REQUIRED_INDICATORS) + } + + async def get_ohlcv_missing_indicators(self, symbol: str, interval: str, + limit: int = 1000) -> List[Dict[str, Any]]: + """ + Get OHLCV records that are missing technical indicators. + Returns time ranges that need indicator calculation. + """ + if not self.pool: + return [] + + symbol_u = _safe_upper(symbol) + + async with self.pool.acquire() as conn: + rows = await conn.fetch( + """ + WITH ohlcv_with_counts AS ( + SELECT + o.time, + COUNT(DISTINCT ti.indicator_name) as indicator_count + FROM crypto_ohlcv o + LEFT JOIN technical_indicators ti ON + ti.time = o.time AND + ti.symbol = o.symbol AND + ti.interval = o.interval + WHERE o.symbol = $1 AND o.interval = $2 + GROUP BY o.time + HAVING COUNT(DISTINCT ti.indicator_name) < $3 + ORDER BY o.time DESC + LIMIT $4 + ) + SELECT time, indicator_count + FROM ohlcv_with_counts; + """, + symbol_u, interval, len(self.REQUIRED_INDICATORS), limit + ) + + return [{'time': row['time'].isoformat(), + 'existing_indicators': int(row['indicator_count'])} for row in rows] + + async def backfill_missing_indicators(self, symbol: str, interval: str, + batch_size: int = 200) -> Dict[str, Any]: + """ + Backfill missing technical indicators for existing OHLCV records. + Processes in batches to avoid memory issues. + """ + self.logger.info(f"Starting indicator backfill for {symbol} {interval}") + + symbol_u = _safe_upper(symbol) + total_processed = 0 + total_indicators_added = 0 + + try: + # Get coverage before backfill + coverage_before = await self.check_indicator_coverage(symbol_u, interval) + + if coverage_before['coverage_percent'] >= 99.9: + return { + 'status': 'success', + 'message': 'Indicator coverage already complete', + 'records_processed': 0, + 'indicators_added': 0, + 'coverage_before': coverage_before['coverage_percent'], + 'coverage_after': coverage_before['coverage_percent'] + } + + # Process in batches + while True: + # Get batch of OHLCV records needing indicators + missing_times = await self.get_ohlcv_missing_indicators( + symbol_u, interval, limit=batch_size + ) + + if not missing_times: + break + + # Fetch OHLCV data for calculation (need history for indicators) + # Get earliest time in batch and fetch enough history + earliest_time = min(datetime.fromisoformat(t['time']) for t in missing_times) + + # Fetch sufficient historical data (250 candles for SMA-200) + lookback_time = earliest_time - timedelta( + seconds=self._interval_to_seconds(interval) * 250 + ) + + ohlcv_data = await self.get_ohlcv_data_range( + symbol_u, interval, lookback_time, + datetime.fromisoformat(missing_times[0]['time']) + ) + + if len(ohlcv_data) < 50: + self.logger.warning( + f"Insufficient data for indicators: {len(ohlcv_data)} records" + ) + break + + # Calculate indicators using utils + from utils import calculate_technical_indicators, load_config + import pandas as pd + + config = load_config() + indicator_config = config.get('technical_indicators', {}) + + df = pd.DataFrame(ohlcv_data) + df['time'] = pd.to_datetime(df['time']) + df = df.sort_values('time') + df.set_index('time', inplace=True) + + # Rename for pandas_ta + df = df.rename(columns={ + 'open_price': 'open', + 'high_price': 'high', + 'low_price': 'low', + 'close_price': 'close' + }) + + indicators_data = calculate_technical_indicators(df, indicator_config) + + if indicators_data: + # Filter to only missing times + missing_time_set = {t['time'] for t in missing_times} + filtered_indicators = [ + ind for ind in indicators_data + if ind['time'].isoformat() in missing_time_set + ] + + if filtered_indicators: + await self.insert_indicators_batch(symbol_u, interval, filtered_indicators) + total_indicators_added += len(filtered_indicators) + total_processed += len(missing_times) + + self.logger.info( + f"Backfilled {len(filtered_indicators)} indicators for " + f"{len(missing_times)} OHLCV records" + ) + + # Small delay to avoid overwhelming the database + await asyncio.sleep(0.1) + + # Safety check: if we've processed many batches, break + if total_processed >= 10000: + self.logger.warning("Processed 10k+ records, stopping this batch") + break + + # Get coverage after backfill + coverage_after = await self.check_indicator_coverage(symbol_u, interval) + + return { + 'status': 'success', + 'symbol': symbol_u, + 'interval': interval, + 'records_processed': total_processed, + 'indicators_added': total_indicators_added, + 'coverage_before': coverage_before['coverage_percent'], + 'coverage_after': coverage_after['coverage_percent'], + 'improvement': round(coverage_after['coverage_percent'] - + coverage_before['coverage_percent'], 2) + } + + except Exception as e: + self.logger.error(f"Error backfilling indicators: {e}", exc_info=True) + return { + 'status': 'error', + 'error': str(e), + 'records_processed': total_processed, + 'indicators_added': total_indicators_added + } + + async def get_ohlcv_data_range(self, symbol: str, interval: str, + start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]: + """Get OHLCV data for a specific time range (for indicator calculation)""" + if not self.pool: + return [] + + symbol_u = _safe_upper(symbol) + start_u = _ensure_dt_aware_utc(start_time) + end_u = _ensure_dt_aware_utc(end_time) + + async with self.pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT time, symbol, exchange, interval, open_price, high_price, + low_price, close_price, volume, quote_volume, trade_count + FROM crypto_ohlcv + WHERE symbol = $1 AND interval = $2 + AND time >= $3 AND time <= $4 + ORDER BY time ASC; + """, + symbol_u, interval, start_u, end_u + ) + + return [dict(row) for row in rows] + + async def get_all_indicator_coverage_status(self) -> List[Dict[str, Any]]: + """Get indicator coverage status for all symbol/interval combinations""" + symbols = await self.get_available_symbols() + intervals = ['1m', '5m', '15m', '1h', '4h', '1d'] + + results = [] + for symbol in symbols: + for interval in intervals: + try: + coverage = await self.check_indicator_coverage(symbol, interval) + if coverage['total_ohlcv'] > 0: + results.append(coverage) + except Exception as e: + self.logger.error(f"Error checking coverage for {symbol} {interval}: {e}") + + return results + async def close(self): """Close database connection pool""" if self.pool: @@ -1665,7 +2002,6 @@ async def create_database_if_not_exists(): finally: await conn.close() - if __name__ == "__main__": async def test_db(): await create_database_if_not_exists()