Compare commits

6 Commits
main ... dev

Author SHA1 Message Date
787b37df87 setup 2025-10-09 09:41:52 +01:00
b391427023 feat: Add indicator coverage monitoring to gaps page
- Add "Indicator Coverage" tab alongside existing "Gap Detection" tab
- Display coverage summary with total/complete/incomplete combinations
- Show detailed coverage table with progress bars and status badges
- Add backfill buttons for individual symbol/interval pairs
- Add bulk backfill option for all incomplete indicators
- Include filter and search functionality for both tabs
- Show missing indicator counts and details per combination
- Real-time refresh capabilities for both gaps and indicators
- Maintain all existing gap detection functionality
- Provide visual progress bars showing coverage percentages
- Support batch operations with confirmation dialogs

This integrates indicator coverage monitoring into the existing gaps
interface, providing a unified data quality dashboard for monitoring
both OHLCV gaps and technical indicator completeness.
2025-10-09 09:07:40 +01:00
9db0c4ca80 feat: Add API endpoints for technical indicator coverage management
- Add GET /api/indicators/coverage/{symbol}/{interval} to check coverage
- Add GET /api/indicators/coverage/all for system-wide coverage status
- Add POST /api/indicators/backfill/{symbol}/{interval} to backfill missing indicators
- Add POST /api/indicators/backfill-all for bulk backfill operations
- Add GET /api/indicators/missing/{symbol}/{interval} to list incomplete records
- Add GET /api/indicators/summary for aggregate coverage statistics
- Support configurable batch_size and min_coverage_threshold parameters
- Return detailed results including before/after coverage percentages
- Provide summary statistics with coverage ranges and lowest coverage pairs
- Enable UI integration for monitoring and managing indicator completeness

These endpoints expose the db.py indicator coverage methods through the
web API, allowing users to monitor and maintain 100% technical indicator
coverage across all trading pairs via the web interface.
2025-10-09 08:55:22 +01:00
e891cd7c15 feat: Integrate 100% indicator coverage enforcement into data collection
- Add _ensure_indicator_coverage() to verify and backfill after data collection
- Add start_indicator_coverage_monitor() background task for periodic checks
- Configure coverage monitoring with ensure_100_percent_coverage flag
- Set coverage_check_interval_hours (default: 6 hours) for monitoring frequency
- Set backfill_batch_size (default: 200) for efficient backfilling
- Call coverage check after bulk downloads, gap fills, and candle generation
- Start indicator_coverage_monitor task in continuous collection mode
- Log coverage percentages and backfill results for transparency
- Ensure all OHLCV records have complete technical indicator coverage

This integrates the new db.py indicator coverage methods into the main
data collection workflow, ensuring 100% coverage is automatically
maintained across all symbol/interval combinations.
2025-10-09 08:51:09 +01:00
2708bcb176 feat: Add 100% technical indicator coverage tracking and backfilling
- Add check_indicator_coverage() to detect OHLCV records missing indicators
- Add get_ohlcv_missing_indicators() to identify specific gaps
- Add backfill_missing_indicators() to fill missing indicator data
- Add get_ohlcv_data_range() helper for fetching historical data
- Add get_all_indicator_coverage_status() for system-wide monitoring
- Define REQUIRED_INDICATORS constant with all 16 required indicators
- Process backfills in configurable batches to manage memory
- Calculate indicators using existing utils.calculate_technical_indicators()
- Track coverage statistics before/after backfill operations
- Support for automated indicator completeness verification

This ensures every crypto_ohlcv record has all 16 technical indicators
(adx_14, atr_14, bb_lower, bb_middle, bb_upper, ema_12, ema_26,
macd_histogram, macd_line, macd_signal, rsi_14, sma_20, sma_200,
sma_50, stoch_d, stoch_k) calculated and stored
2025-10-09 08:46:25 +01:00
4d0460944a Updated main to allow significantly more efficient downloading 2025-10-05 17:53:41 +01:00
8 changed files with 1936 additions and 733 deletions

3
.gitignore vendored
View File

@@ -11,6 +11,7 @@
# Icon must end with two \r # Icon must end with two \r
Icon Icon
# Thumbnails # Thumbnails
._* ._*
@@ -57,3 +58,5 @@ Temporary Items
# Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option) # Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option)
config.config
variables.env

338
db.py
View File

@@ -1635,6 +1635,343 @@ class DatabaseManager:
async with conn.transaction(): async with conn.transaction():
yield conn yield conn
# --------------------------
# Indicator Coverage Methods
# --------------------------
REQUIRED_INDICATORS = [
'adx_14', 'atr_14', 'bb_lower', 'bb_middle', 'bb_upper',
'ema_12', 'ema_26', 'macd_histogram', 'macd_line', 'macd_signal',
'rsi_14', 'sma_20', 'sma_200', 'sma_50', 'stoch_d', 'stoch_k'
]
async def check_indicator_coverage(self, symbol: str, interval: str) -> Dict[str, Any]:
"""
Check indicator coverage for a symbol/interval.
Returns detailed coverage statistics showing which OHLCV records are missing indicators.
"""
if not self.pool:
return {'total_ohlcv': 0, 'complete_records': 0, 'coverage_percent': 0.0}
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
# Get total OHLCV records
total_result = await conn.fetchrow(
"""
SELECT COUNT(*) as total
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2;
""",
symbol_u, interval
)
total_ohlcv = int(total_result['total']) if total_result else 0
if total_ohlcv == 0:
return {
'symbol': symbol_u,
'interval': interval,
'total_ohlcv': 0,
'complete_records': 0,
'incomplete_records': 0,
'coverage_percent': 0.0,
'missing_indicators': [],
}
# Count OHLCV records that have ALL 16 required indicators
complete_result = await conn.fetchrow(
"""
WITH ohlcv_times AS (
SELECT time
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
),
indicator_counts AS (
SELECT
o.time,
COUNT(DISTINCT ti.indicator_name) as indicator_count
FROM ohlcv_times o
LEFT JOIN technical_indicators ti ON
ti.time = o.time AND
ti.symbol = $1 AND
ti.interval = $2
GROUP BY o.time
)
SELECT
COUNT(*) FILTER (WHERE indicator_count = $3) as complete_count,
COUNT(*) FILTER (WHERE indicator_count < $3) as incomplete_count
FROM indicator_counts;
""",
symbol_u, interval, len(self.REQUIRED_INDICATORS)
)
complete_count = int(complete_result['complete_count']) if complete_result else 0
incomplete_count = int(complete_result['incomplete_count']) if complete_result else total_ohlcv
coverage_percent = (complete_count / total_ohlcv * 100) if total_ohlcv > 0 else 0.0
# Get list of indicators with missing coverage
missing_indicators_result = await conn.fetch(
"""
WITH ohlcv_count AS (
SELECT COUNT(*) as total
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
)
SELECT
indicator_name,
COUNT(*) as present_count,
(SELECT total FROM ohlcv_count) as total_ohlcv,
ROUND(COUNT(*)::decimal / (SELECT total FROM ohlcv_count) * 100, 2) as coverage_pct
FROM technical_indicators
WHERE symbol = $1 AND interval = $2
GROUP BY indicator_name
ORDER BY coverage_pct ASC;
""",
symbol_u, interval
)
missing_indicators = []
present_indicators = {row['indicator_name']: {
'present': int(row['present_count']),
'missing': total_ohlcv - int(row['present_count']),
'coverage_pct': float(row['coverage_pct'])
} for row in missing_indicators_result}
for req_ind in self.REQUIRED_INDICATORS:
if req_ind not in present_indicators:
missing_indicators.append({
'indicator': req_ind,
'present': 0,
'missing': total_ohlcv,
'coverage_pct': 0.0
})
elif present_indicators[req_ind]['missing'] > 0:
missing_indicators.append({
'indicator': req_ind,
'present': present_indicators[req_ind]['present'],
'missing': present_indicators[req_ind]['missing'],
'coverage_pct': present_indicators[req_ind]['coverage_pct']
})
return {
'symbol': symbol_u,
'interval': interval,
'total_ohlcv': total_ohlcv,
'complete_records': complete_count,
'incomplete_records': incomplete_count,
'coverage_percent': round(coverage_percent, 2),
'missing_indicators': missing_indicators,
'required_indicators': len(self.REQUIRED_INDICATORS)
}
async def get_ohlcv_missing_indicators(self, symbol: str, interval: str,
limit: int = 1000) -> List[Dict[str, Any]]:
"""
Get OHLCV records that are missing technical indicators.
Returns time ranges that need indicator calculation.
"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
WITH ohlcv_with_counts AS (
SELECT
o.time,
COUNT(DISTINCT ti.indicator_name) as indicator_count
FROM crypto_ohlcv o
LEFT JOIN technical_indicators ti ON
ti.time = o.time AND
ti.symbol = o.symbol AND
ti.interval = o.interval
WHERE o.symbol = $1 AND o.interval = $2
GROUP BY o.time
HAVING COUNT(DISTINCT ti.indicator_name) < $3
ORDER BY o.time DESC
LIMIT $4
)
SELECT time, indicator_count
FROM ohlcv_with_counts;
""",
symbol_u, interval, len(self.REQUIRED_INDICATORS), limit
)
return [{'time': row['time'].isoformat(),
'existing_indicators': int(row['indicator_count'])} for row in rows]
async def backfill_missing_indicators(self, symbol: str, interval: str,
batch_size: int = 200) -> Dict[str, Any]:
"""
Backfill missing technical indicators for existing OHLCV records.
Processes in batches to avoid memory issues.
"""
self.logger.info(f"Starting indicator backfill for {symbol} {interval}")
symbol_u = _safe_upper(symbol)
total_processed = 0
total_indicators_added = 0
try:
# Get coverage before backfill
coverage_before = await self.check_indicator_coverage(symbol_u, interval)
if coverage_before['coverage_percent'] >= 99.9:
return {
'status': 'success',
'message': 'Indicator coverage already complete',
'records_processed': 0,
'indicators_added': 0,
'coverage_before': coverage_before['coverage_percent'],
'coverage_after': coverage_before['coverage_percent']
}
# Process in batches
while True:
# Get batch of OHLCV records needing indicators
missing_times = await self.get_ohlcv_missing_indicators(
symbol_u, interval, limit=batch_size
)
if not missing_times:
break
# Fetch OHLCV data for calculation (need history for indicators)
# Get earliest time in batch and fetch enough history
earliest_time = min(datetime.fromisoformat(t['time']) for t in missing_times)
# Fetch sufficient historical data (250 candles for SMA-200)
lookback_time = earliest_time - timedelta(
seconds=self._interval_to_seconds(interval) * 250
)
ohlcv_data = await self.get_ohlcv_data_range(
symbol_u, interval, lookback_time,
datetime.fromisoformat(missing_times[0]['time'])
)
if len(ohlcv_data) < 50:
self.logger.warning(
f"Insufficient data for indicators: {len(ohlcv_data)} records"
)
break
# Calculate indicators using utils
from utils import calculate_technical_indicators, load_config
import pandas as pd
config = load_config()
indicator_config = config.get('technical_indicators', {})
df = pd.DataFrame(ohlcv_data)
df['time'] = pd.to_datetime(df['time'])
df = df.sort_values('time')
df.set_index('time', inplace=True)
# Rename for pandas_ta
df = df.rename(columns={
'open_price': 'open',
'high_price': 'high',
'low_price': 'low',
'close_price': 'close'
})
indicators_data = calculate_technical_indicators(df, indicator_config)
if indicators_data:
# Filter to only missing times
missing_time_set = {t['time'] for t in missing_times}
filtered_indicators = [
ind for ind in indicators_data
if ind['time'].isoformat() in missing_time_set
]
if filtered_indicators:
await self.insert_indicators_batch(symbol_u, interval, filtered_indicators)
total_indicators_added += len(filtered_indicators)
total_processed += len(missing_times)
self.logger.info(
f"Backfilled {len(filtered_indicators)} indicators for "
f"{len(missing_times)} OHLCV records"
)
# Small delay to avoid overwhelming the database
await asyncio.sleep(0.1)
# Safety check: if we've processed many batches, break
if total_processed >= 10000:
self.logger.warning("Processed 10k+ records, stopping this batch")
break
# Get coverage after backfill
coverage_after = await self.check_indicator_coverage(symbol_u, interval)
return {
'status': 'success',
'symbol': symbol_u,
'interval': interval,
'records_processed': total_processed,
'indicators_added': total_indicators_added,
'coverage_before': coverage_before['coverage_percent'],
'coverage_after': coverage_after['coverage_percent'],
'improvement': round(coverage_after['coverage_percent'] -
coverage_before['coverage_percent'], 2)
}
except Exception as e:
self.logger.error(f"Error backfilling indicators: {e}", exc_info=True)
return {
'status': 'error',
'error': str(e),
'records_processed': total_processed,
'indicators_added': total_indicators_added
}
async def get_ohlcv_data_range(self, symbol: str, interval: str,
start_time: datetime, end_time: datetime) -> List[Dict[str, Any]]:
"""Get OHLCV data for a specific time range (for indicator calculation)"""
if not self.pool:
return []
symbol_u = _safe_upper(symbol)
start_u = _ensure_dt_aware_utc(start_time)
end_u = _ensure_dt_aware_utc(end_time)
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT time, symbol, exchange, interval, open_price, high_price,
low_price, close_price, volume, quote_volume, trade_count
FROM crypto_ohlcv
WHERE symbol = $1 AND interval = $2
AND time >= $3 AND time <= $4
ORDER BY time ASC;
""",
symbol_u, interval, start_u, end_u
)
return [dict(row) for row in rows]
async def get_all_indicator_coverage_status(self) -> List[Dict[str, Any]]:
"""Get indicator coverage status for all symbol/interval combinations"""
symbols = await self.get_available_symbols()
intervals = ['1m', '5m', '15m', '1h', '4h', '1d']
results = []
for symbol in symbols:
for interval in intervals:
try:
coverage = await self.check_indicator_coverage(symbol, interval)
if coverage['total_ohlcv'] > 0:
results.append(coverage)
except Exception as e:
self.logger.error(f"Error checking coverage for {symbol} {interval}: {e}")
return results
async def close(self): async def close(self):
"""Close database connection pool""" """Close database connection pool"""
if self.pool: if self.pool:
@@ -1665,7 +2002,6 @@ async def create_database_if_not_exists():
finally: finally:
await conn.close() await conn.close()
if __name__ == "__main__": if __name__ == "__main__":
async def test_db(): async def test_db():
await create_database_if_not_exists() await create_database_if_not_exists()

679
main.py

File diff suppressed because it is too large Load Diff

20
setup.sh Normal file
View File

@@ -0,0 +1,20 @@
#!/bin/bash
# Create virtual environment
python3 -m venv venv
# Activate virtual environment
source venv/bin/activate
# Upgrade pip in the virtual environment
pip install --upgrade pip
# Install your packages
pip install websockets aiohttp python-binance pandas pandas-ta python-dotenv asyncpg fastapi uvicorn[standard] psycopg2
# Create a requirements.txt file for future use
pip freeze > requirements.txt
echo "Virtual environment created and packages installed!"
echo "To activate this environment in the future, run: source ~/market_data/venv/bin/activate"

View File

@@ -1,19 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
ui_routes.py - API Endpoints and Route Handlers ui_routes.py - API Endpoints and Route Handlers
Defines all FastAPI routes and business logic for API endpoints Defines all FastAPI routes and business logic for API endpoints
""" """
import asyncio import asyncio
import logging import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Dict, Any, Optional, List from typing import Dict, Any, Optional, List
from fastapi import HTTPException, Request from fastapi import HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse from fastapi.responses import HTMLResponse, JSONResponse
from dotenv import set_key, dotenv_values from dotenv import set_key, dotenv_values
# UI and models # UI and models
@@ -101,10 +101,205 @@ class APIRoutes:
try: try:
status = await get_current_status(self.db_manager, self.data_collector, self.config) status = await get_current_status(self.db_manager, self.data_collector, self.config)
return JSONResponse(content=serialize_for_json(status)) return JSONResponse(content=serialize_for_json(status))
except Exception as e: except Exception as e:
logger.error(f"Error getting stats: {e}", exc_info=True) logger.error(f"Error getting stats: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# ---------------------------
# Indicator Coverage (NEW)
# ---------------------------
@self.app.get("/api/indicators/coverage/{symbol}/{interval}")
async def get_indicator_coverage(symbol: str, interval: str):
"""Get technical indicator coverage status for a symbol/interval"""
try:
if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized")
coverage = await self.db_manager.check_indicator_coverage(symbol.upper(), interval)
return _ok(coverage)
except Exception as e:
logger.error(f"Error checking indicator coverage: {e}", exc_info=True)
return _err(str(e), 500)
@self.app.get("/api/indicators/coverage/all")
async def get_all_indicator_coverage():
"""Get indicator coverage status for all symbol/interval combinations"""
try:
if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized")
logger.info("Fetching indicator coverage for all pairs")
coverage_list = await self.db_manager.get_all_indicator_coverage_status()
logger.info(f"Retrieved coverage for {len(coverage_list)} combinations")
return _ok(coverage_list)
except Exception as e:
logger.error(f"Error getting all indicator coverage: {e}", exc_info=True)
return _err(str(e), 500)
@self.app.post("/api/indicators/backfill/{symbol}/{interval}")
async def backfill_indicators(symbol: str, interval: str, request: Request):
"""Backfill missing technical indicators for a symbol/interval"""
try:
if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized")
body = await request.json() if request.headers.get('content-type') == 'application/json' else {}
batch_size = body.get('batch_size', 200)
logger.info(f"Starting indicator backfill for {symbol} {interval} (batch_size={batch_size})")
result = await self.db_manager.backfill_missing_indicators(
symbol.upper(), interval, batch_size=batch_size
)
logger.info(f"Indicator backfill completed: {result}")
return _ok(result)
except Exception as e:
logger.error(f"Error backfilling indicators: {e}", exc_info=True)
return _err(str(e), 500)
@self.app.post("/api/indicators/backfill-all")
async def backfill_all_indicators(request: Request):
"""Backfill missing indicators for all symbol/interval combinations with incomplete coverage"""
try:
if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized")
body = await request.json() if request.headers.get('content-type') == 'application/json' else {}
batch_size = body.get('batch_size', 200)
min_coverage_threshold = body.get('min_coverage_threshold', 99.9)
logger.info("Starting indicator backfill for all pairs with incomplete coverage")
# Get all coverage status
coverage_list = await self.db_manager.get_all_indicator_coverage_status()
# Filter to only those needing backfill
needs_backfill = [
c for c in coverage_list
if c['coverage_percent'] < min_coverage_threshold
]
logger.info(f"Found {len(needs_backfill)} combinations needing backfill")
results = []
for coverage in needs_backfill:
symbol = coverage['symbol']
interval = coverage['interval']
try:
result = await self.db_manager.backfill_missing_indicators(
symbol, interval, batch_size=batch_size
)
results.append({
'symbol': symbol,
'interval': interval,
'status': result['status'],
'coverage_before': result.get('coverage_before', 0),
'coverage_after': result.get('coverage_after', 0),
'indicators_added': result.get('indicators_added', 0)
})
logger.info(
f"Backfilled {symbol} {interval}: "
f"{result.get('coverage_before', 0):.2f}% → {result.get('coverage_after', 0):.2f}%"
)
# Small delay to avoid overwhelming the database
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"Error backfilling {symbol} {interval}: {e}")
results.append({
'symbol': symbol,
'interval': interval,
'status': 'error',
'error': str(e)
})
summary = {
'total_checked': len(coverage_list),
'needed_backfill': len(needs_backfill),
'processed': len(results),
'successful': len([r for r in results if r.get('status') == 'success']),
'failed': len([r for r in results if r.get('status') == 'error']),
'results': results
}
logger.info(f"Bulk indicator backfill complete: {summary['successful']}/{summary['processed']} successful")
return _ok(summary)
except Exception as e:
logger.error(f"Error in bulk indicator backfill: {e}", exc_info=True)
return _err(str(e), 500)
@self.app.get("/api/indicators/missing/{symbol}/{interval}")
async def get_missing_indicators(symbol: str, interval: str):
"""Get list of OHLCV records missing technical indicators"""
try:
if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized")
missing = await self.db_manager.get_ohlcv_missing_indicators(
symbol.upper(), interval, limit=100
)
return _ok({
'symbol': symbol.upper(),
'interval': interval,
'missing_count': len(missing),
'missing_records': missing
})
except Exception as e:
logger.error(f"Error getting missing indicators: {e}", exc_info=True)
return _err(str(e), 500)
@self.app.get("/api/indicators/summary")
async def get_indicators_summary():
"""Get summary of indicator coverage across all symbols"""
try:
if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized")
coverage_list = await self.db_manager.get_all_indicator_coverage_status()
# Calculate summary statistics
total_combinations = len(coverage_list)
complete_combinations = len([c for c in coverage_list if c['coverage_percent'] >= 99.9])
incomplete_combinations = total_combinations - complete_combinations
avg_coverage = sum(c['coverage_percent'] for c in coverage_list) / total_combinations if total_combinations > 0 else 0
# Group by coverage ranges
coverage_ranges = {
'100%': len([c for c in coverage_list if c['coverage_percent'] >= 99.9]),
'90-99%': len([c for c in coverage_list if 90 <= c['coverage_percent'] < 99.9]),
'50-89%': len([c for c in coverage_list if 50 <= c['coverage_percent'] < 90]),
'0-49%': len([c for c in coverage_list if c['coverage_percent'] < 50]),
}
summary = {
'total_combinations': total_combinations,
'complete_combinations': complete_combinations,
'incomplete_combinations': incomplete_combinations,
'average_coverage': round(avg_coverage, 2),
'coverage_ranges': coverage_ranges,
'lowest_coverage': sorted(coverage_list, key=lambda x: x['coverage_percent'])[:10] if coverage_list else []
}
return _ok(summary)
except Exception as e:
logger.error(f"Error getting indicators summary: {e}", exc_info=True)
return _err(str(e), 500)
# --------------------------- # ---------------------------
# Gaps and Coverage # Gaps and Coverage
# --------------------------- # ---------------------------
@@ -116,10 +311,13 @@ class APIRoutes:
if not self.db_manager: if not self.db_manager:
logger.error("Database manager not initialized") logger.error("Database manager not initialized")
return _err("Database not initialized", 500) return _err("Database not initialized", 500)
logger.info("Fetching gap status for all pairs") logger.info("Fetching gap status for all pairs")
status = await self.db_manager.get_all_pairs_gap_status() status = await self.db_manager.get_all_pairs_gap_status()
logger.info(f"Retrieved gap status for {len(status)} pair-interval combinations") logger.info(f"Retrieved gap status for {len(status)} pair-interval combinations")
return _ok(status) return _ok(status)
except Exception as e: except Exception as e:
logger.error(f"Error getting all pairs gaps: {e}", exc_info=True) logger.error(f"Error getting all pairs gaps: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -133,8 +331,10 @@ class APIRoutes:
sym = symbol.upper() sym = symbol.upper()
gap_info = await self.db_manager.detect_gaps(sym, interval) gap_info = await self.db_manager.detect_gaps(sym, interval)
end_date = datetime.utcnow() end_date = datetime.utcnow()
start_date = end_date - timedelta(days=90) start_date = end_date - timedelta(days=90)
daily_coverage = await self.db_manager.get_data_coverage_by_day(sym, interval, start_date, end_date) daily_coverage = await self.db_manager.get_data_coverage_by_day(sym, interval, start_date, end_date)
data = { data = {
@@ -144,7 +344,9 @@ class APIRoutes:
"gaps": gap_info.get('gaps', []), "gaps": gap_info.get('gaps', []),
"daily_coverage": daily_coverage.get('daily_coverage', []), "daily_coverage": daily_coverage.get('daily_coverage', []),
} }
return _ok(data) return _ok(data)
except Exception as e: except Exception as e:
logger.error(f"Error getting gap details: {e}", exc_info=True) logger.error(f"Error getting gap details: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -165,8 +367,10 @@ class APIRoutes:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
result = await self.db_manager.fill_gaps_intelligently(symbol.upper(), interval, max_attempts) result = await self.db_manager.fill_gaps_intelligently(symbol.upper(), interval, max_attempts)
logger.info(f"Intelligent gap fill completed: {result}") logger.info(f"Intelligent gap fill completed: {result}")
return _ok(result) return _ok(result)
except Exception as e: except Exception as e:
logger.error(f"Error in intelligent gap fill: {e}", exc_info=True) logger.error(f"Error in intelligent gap fill: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -177,8 +381,10 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
prioritized = await self.db_manager.get_prioritized_gaps(symbol.upper(), interval) prioritized = await self.db_manager.get_prioritized_gaps(symbol.upper(), interval)
return _ok(prioritized) return _ok(prioritized)
except Exception as e: except Exception as e:
logger.error(f"Error getting prioritized gaps: {e}", exc_info=True) logger.error(f"Error getting prioritized gaps: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -189,8 +395,10 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
progress = await self.db_manager.get_gap_fill_progress(symbol.upper(), interval) progress = await self.db_manager.get_gap_fill_progress(symbol.upper(), interval)
return _ok(progress) return _ok(progress)
except Exception as e: except Exception as e:
logger.error(f"Error getting gap progress: {e}", exc_info=True) logger.error(f"Error getting gap progress: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -201,8 +409,10 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
health = await self.db_manager.check_data_health(symbol.upper(), interval) health = await self.db_manager.check_data_health(symbol.upper(), interval)
return _ok(health) return _ok(health)
except Exception as e: except Exception as e:
logger.error(f"Error checking data health: {e}", exc_info=True) logger.error(f"Error checking data health: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -219,21 +429,30 @@ class APIRoutes:
intervals = cfg.get('collection', {}).get('candle_intervals', ['1m', '5m', '15m', '1h', '4h', '1d']) intervals = cfg.get('collection', {}).get('candle_intervals', ['1m', '5m', '15m', '1h', '4h', '1d'])
results: List[Dict[str, Any]] = [] results: List[Dict[str, Any]] = []
for interval in intervals: for interval in intervals:
prioritized = await self.db_manager.get_prioritized_gaps(symbol.upper(), interval) prioritized = await self.db_manager.get_prioritized_gaps(symbol.upper(), interval)
if not prioritized: if not prioritized:
continue continue
filled = 0 filled = 0
for gap in prioritized[:5]: for gap in prioritized[:5]: # Only fill top 5 gaps
if gap.get('missing_candles', 0) <= 100: if gap.get('missing_candles', 0) <= 100:
try: try:
await self.db_manager.fill_gaps_intelligently(symbol.upper(), interval, max_attempts=3) await self.db_manager.fill_gaps_intelligently(symbol.upper(), interval, max_attempts=3)
filled += 1 filled += 1
except Exception as e: except Exception as e:
logger.error(f"Error filling gap: {e}") logger.error(f"Error filling gap: {e}")
results.append({'interval': interval, 'gaps_filled': filled, 'total_gaps': len(prioritized)}) results.append({'interval': interval, 'gaps_filled': filled, 'total_gaps': len(prioritized)})
return JSONResponse(content={"status": "success", "message": f"Smart fill completed for {symbol}", "data": results}) return JSONResponse(content={
"status": "success",
"message": f"Smart fill completed for {symbol}",
"data": results
})
except Exception as e: except Exception as e:
logger.error(f"Error in smart fill: {e}", exc_info=True) logger.error(f"Error in smart fill: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -247,6 +466,7 @@ class APIRoutes:
gap_start = datetime.fromisoformat(request.gap_start) gap_start = datetime.fromisoformat(request.gap_start)
gap_end = datetime.fromisoformat(request.gap_end) gap_end = datetime.fromisoformat(request.gap_end)
gap_start = _tz_aware(gap_start) gap_start = _tz_aware(gap_start)
gap_end = _tz_aware(gap_end) gap_end = _tz_aware(gap_end)
@@ -256,8 +476,10 @@ class APIRoutes:
gap_end, gap_end,
[request.interval], [request.interval],
) )
logger.info(f"Gap filled for {request.symbol} {request.interval}") logger.info(f"Gap filled for {request.symbol} {request.interval}")
return JSONResponse(content={"status": "success", "message": "Gap filled successfully"}) return JSONResponse(content={"status": "success", "message": "Gap filled successfully"})
except Exception as e: except Exception as e:
logger.error(f"Error filling gap: {e}", exc_info=True) logger.error(f"Error filling gap: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -268,13 +490,20 @@ class APIRoutes:
try: try:
if not self.data_collector: if not self.data_collector:
raise HTTPException(status_code=500, detail="Data collector not initialized") raise HTTPException(status_code=500, detail="Data collector not initialized")
result = await self.data_collector.auto_fill_gaps( result = await self.data_collector.auto_fill_gaps(
request.symbol.upper(), request.symbol.upper(),
request.intervals, request.intervals,
request.fill_genuine_gaps, request.fill_genuine_gaps,
) )
logger.info(f"Auto gap fill completed for {request.symbol}: {result}") logger.info(f"Auto gap fill completed for {request.symbol}: {result}")
return JSONResponse(content={"status": "success", "message": f"Filled gaps for {request.symbol}", "result": serialize_for_json(result)}) return JSONResponse(content={
"status": "success",
"message": f"Filled gaps for {request.symbol}",
"result": serialize_for_json(result)
})
except Exception as e: except Exception as e:
logger.error(f"Error in auto gap fill: {e}", exc_info=True) logger.error(f"Error in auto gap fill: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -285,8 +514,10 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
summary = await self.db_manager.get_all_gaps_summary() summary = await self.db_manager.get_all_gaps_summary()
return _ok(summary) return _ok(summary)
except Exception as e: except Exception as e:
logger.error(f"Error getting gaps summary: {e}", exc_info=True) logger.error(f"Error getting gaps summary: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -297,8 +528,10 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
status = await self.db_manager.get_gap_fill_status(symbol.upper(), interval) status = await self.db_manager.get_gap_fill_status(symbol.upper(), interval)
return _ok(status) return _ok(status)
except Exception as e: except Exception as e:
logger.error(f"Error getting gap status: {e}", exc_info=True) logger.error(f"Error getting gap status: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -309,8 +542,10 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
gaps = await self.db_manager.detect_gaps(symbol.upper(), interval) gaps = await self.db_manager.detect_gaps(symbol.upper(), interval)
return JSONResponse(content={"status": "success", "gaps": serialize_for_json(gaps)}) return JSONResponse(content={"status": "success", "gaps": serialize_for_json(gaps)})
except Exception as e: except Exception as e:
logger.error(f"Error detecting gaps: {e}", exc_info=True) logger.error(f"Error detecting gaps: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -321,12 +556,15 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
gap_config = self.config.get('gap_filling', {}) gap_config = self.config.get('gap_filling', {})
max_consecutive = int(gap_config.get('max_consecutive_empty_candles', 5)) max_consecutive = int(gap_config.get('max_consecutive_empty_candles', 5))
lookback = int(gap_config.get('averaging_lookback_candles', 10)) lookback = int(gap_config.get('averaging_lookback_candles', 10))
filled_count = await self.db_manager.fill_genuine_gaps_with_averages( filled_count = await self.db_manager.fill_genuine_gaps_with_averages(
symbol.upper(), interval, max_consecutive, lookback symbol.upper(), interval, max_consecutive, lookback
) )
logger.info(f"Filled {filled_count} genuine gaps for {symbol} {interval}") logger.info(f"Filled {filled_count} genuine gaps for {symbol} {interval}")
return JSONResponse( return JSONResponse(
content={ content={
@@ -335,6 +573,7 @@ class APIRoutes:
"filled_count": filled_count, "filled_count": filled_count,
} }
) )
except Exception as e: except Exception as e:
logger.error(f"Error filling genuine gaps: {e}", exc_info=True) logger.error(f"Error filling genuine gaps: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -350,9 +589,11 @@ class APIRoutes:
if not self.db_manager: if not self.db_manager:
logger.error("Database manager not initialized") logger.error("Database manager not initialized")
return JSONResponse(content={"status": "error", "symbols": []}, status_code=500) return JSONResponse(content={"status": "error", "symbols": []}, status_code=500)
symbols = await self.db_manager.get_available_symbols() symbols = await self.db_manager.get_available_symbols()
logger.info(f"Retrieved {len(symbols)} symbols from database") logger.info(f"Retrieved {len(symbols)} symbols from database")
return JSONResponse(content={"status": "success", "symbols": symbols}) return JSONResponse(content={"status": "success", "symbols": symbols})
except Exception as e: except Exception as e:
logger.error(f"Error getting symbols: {e}", exc_info=True) logger.error(f"Error getting symbols: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "symbols": []}, status_code=500) return JSONResponse(content={"status": "error", "symbols": []}, status_code=500)
@@ -364,15 +605,21 @@ class APIRoutes:
if not self.db_manager: if not self.db_manager:
logger.error("Database manager not initialized") logger.error("Database manager not initialized")
return _err("Database not initialized", 500) return _err("Database not initialized", 500)
logger.info(f"Price trends request for {symbol}") logger.info(f"Price trends request for {symbol}")
data = await self.db_manager.get_current_price_and_trends_with_volume(symbol.upper()) data = await self.db_manager.get_current_price_and_trends_with_volume(symbol.upper())
if not data: if not data:
logger.warning(f"No price data found for {symbol}") logger.warning(f"No price data found for {symbol}")
return _err(f"No data found for {symbol}. Please start data collection first.", 404) return _err(f"No data found for {symbol}. Please start data collection first.", 404)
pair_config = next((p for p in self.config.get('trading_pairs', []) if p['symbol'] == symbol.upper()), None) pair_config = next((p for p in self.config.get('trading_pairs', []) if p['symbol'] == symbol.upper()), None)
data['enabled'] = pair_config.get('enabled', False) if pair_config else False data['enabled'] = pair_config.get('enabled', False) if pair_config else False
logger.info(f"Returning price trends for {symbol}: price={data.get('current_price')}") logger.info(f"Returning price trends for {symbol}: price={data.get('current_price')}")
return _ok(data) return _ok(data)
except Exception as e: except Exception as e:
logger.error(f"Error getting price trends: {e}", exc_info=True) logger.error(f"Error getting price trends: {e}", exc_info=True)
return _err(f"Error retrieving price trends: {str(e)}", 500) return _err(f"Error retrieving price trends: {str(e)}", 500)
@@ -387,12 +634,16 @@ class APIRoutes:
try: try:
if not self.data_collector: if not self.data_collector:
raise HTTPException(status_code=500, detail="Data collector not initialized") raise HTTPException(status_code=500, detail="Data collector not initialized")
if self.state_manager.get("is_collecting", False): if self.state_manager.get("is_collecting", False):
return JSONResponse(content={"status": "info", "message": "Collection already running"}) return JSONResponse(content={"status": "info", "message": "Collection already running"})
await self.data_collector.start_continuous_collection() await self.data_collector.start_continuous_collection()
self.state_manager.update(is_collecting=True) self.state_manager.update(is_collecting=True)
logger.info("Collection started via API") logger.info("Collection started via API")
return JSONResponse(content={"status": "success", "message": "Collection started"}) return JSONResponse(content={"status": "success", "message": "Collection started"})
except Exception as e: except Exception as e:
logger.error(f"Error starting collection: {e}", exc_info=True) logger.error(f"Error starting collection: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -403,12 +654,16 @@ class APIRoutes:
try: try:
if not self.data_collector: if not self.data_collector:
raise HTTPException(status_code=500, detail="Data collector not initialized") raise HTTPException(status_code=500, detail="Data collector not initialized")
if not self.state_manager.get("is_collecting", False): if not self.state_manager.get("is_collecting", False):
return JSONResponse(content={"status": "info", "message": "Collection not running"}) return JSONResponse(content={"status": "info", "message": "Collection not running"})
await self.data_collector.stop_continuous_collection() await self.data_collector.stop_continuous_collection()
self.state_manager.update(is_collecting=False) self.state_manager.update(is_collecting=False)
logger.info("Collection stopped via API") logger.info("Collection stopped via API")
return JSONResponse(content={"status": "success", "message": "Collection stopped"}) return JSONResponse(content={"status": "success", "message": "Collection stopped"})
except Exception as e: except Exception as e:
logger.error(f"Error stopping collection: {e}", exc_info=True) logger.error(f"Error stopping collection: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -423,6 +678,7 @@ class APIRoutes:
try: try:
cfg = load_config() cfg = load_config()
return JSONResponse(content=serialize_for_json(cfg)) return JSONResponse(content=serialize_for_json(cfg))
except Exception as e: except Exception as e:
logger.error(f"Error getting config: {e}", exc_info=True) logger.error(f"Error getting config: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@@ -432,9 +688,11 @@ class APIRoutes:
"""Update configuration - accepts raw JSON body""" """Update configuration - accepts raw JSON body"""
try: try:
body = await request.json() body = await request.json()
logger.info(f"Received config update keys: {list(body.keys())}") logger.info(f"Received config update keys: {list(body.keys())}")
current_config = load_config() current_config = load_config()
# Deep merge/replace top-level keys # Deep merge/replace top-level keys
for key, value in body.items(): for key, value in body.items():
if key in current_config and isinstance(current_config[key], dict) and isinstance(value, dict): if key in current_config and isinstance(current_config[key], dict) and isinstance(value, dict):
@@ -443,10 +701,13 @@ class APIRoutes:
current_config[key] = value current_config[key] = value
save_config(current_config) save_config(current_config)
self.config.clear() self.config.clear()
self.config.update(current_config) self.config.update(current_config)
logger.info("Configuration updated successfully") logger.info("Configuration updated successfully")
return JSONResponse(content={"status": "success", "message": "Configuration updated"}) return JSONResponse(content={"status": "success", "message": "Configuration updated"})
except Exception as e: except Exception as e:
logger.error(f"Error updating config: {e}", exc_info=True) logger.error(f"Error updating config: {e}", exc_info=True)
return _err(str(e), 500) return _err(str(e), 500)
@@ -459,22 +720,28 @@ class APIRoutes:
return JSONResponse(content={"status": "error", "message": "Invalid symbol format"}, status_code=400) return JSONResponse(content={"status": "error", "message": "Invalid symbol format"}, status_code=400)
cfg = load_config() cfg = load_config()
existing = [p for p in cfg.get('trading_pairs', []) if p['symbol'] == pair.symbol.upper()] existing = [p for p in cfg.get('trading_pairs', []) if p['symbol'] == pair.symbol.upper()]
if existing: if existing:
return JSONResponse(content={"status": "error", "message": "Trading pair already exists"}, status_code=409) return JSONResponse(content={"status": "error", "message": "Trading pair already exists"}, status_code=409)
record_from_date = pair.record_from_date or cfg.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z') record_from_date = pair.record_from_date or cfg.get('collection', {}).get('default_record_from_date', '2020-01-01T00:00:00Z')
cfg.setdefault('trading_pairs', []).append({ cfg.setdefault('trading_pairs', []).append({
'symbol': pair.symbol.upper(), 'symbol': pair.symbol.upper(),
'enabled': True, 'enabled': True,
'priority': pair.priority, 'priority': pair.priority,
'record_from_date': record_from_date, 'record_from_date': record_from_date,
}) })
save_config(cfg) save_config(cfg)
self.config.clear() self.config.clear()
self.config.update(cfg) self.config.update(cfg)
logger.info(f"Added trading pair: {pair.symbol}") logger.info(f"Added trading pair: {pair.symbol}")
return JSONResponse(content={"status": "success", "message": f"Added {pair.symbol}"}) return JSONResponse(content={"status": "success", "message": f"Added {pair.symbol}"})
except Exception as e: except Exception as e:
logger.error(f"Error adding trading pair: {e}", exc_info=True) logger.error(f"Error adding trading pair: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -485,6 +752,7 @@ class APIRoutes:
try: try:
update = await request.json() update = await request.json()
logger.info(f"Updating trading pair {symbol}: {update}") logger.info(f"Updating trading pair {symbol}: {update}")
cfg = load_config() cfg = load_config()
pair_found = False pair_found = False
@@ -496,6 +764,7 @@ class APIRoutes:
pair['priority'] = int(update['priority']) pair['priority'] = int(update['priority'])
if 'record_from_date' in update: if 'record_from_date' in update:
pair['record_from_date'] = update['record_from_date'] pair['record_from_date'] = update['record_from_date']
pair_found = True pair_found = True
break break
@@ -503,10 +772,13 @@ class APIRoutes:
return JSONResponse(content={"status": "error", "message": "Trading pair not found"}, status_code=404) return JSONResponse(content={"status": "error", "message": "Trading pair not found"}, status_code=404)
save_config(cfg) save_config(cfg)
self.config.clear() self.config.clear()
self.config.update(cfg) self.config.update(cfg)
logger.info(f"Updated trading pair: {symbol}") logger.info(f"Updated trading pair: {symbol}")
return JSONResponse(content={"status": "success", "message": f"Updated {symbol}"}) return JSONResponse(content={"status": "success", "message": f"Updated {symbol}"})
except Exception as e: except Exception as e:
logger.error(f"Error updating trading pair: {e}", exc_info=True) logger.error(f"Error updating trading pair: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -516,6 +788,7 @@ class APIRoutes:
"""Remove a trading pair""" """Remove a trading pair"""
try: try:
cfg = load_config() cfg = load_config()
original_count = len(cfg.get('trading_pairs', [])) original_count = len(cfg.get('trading_pairs', []))
cfg['trading_pairs'] = [p for p in cfg.get('trading_pairs', []) if p['symbol'] != symbol.upper()] cfg['trading_pairs'] = [p for p in cfg.get('trading_pairs', []) if p['symbol'] != symbol.upper()]
@@ -523,10 +796,13 @@ class APIRoutes:
return JSONResponse(content={"status": "error", "message": "Trading pair not found"}, status_code=404) return JSONResponse(content={"status": "error", "message": "Trading pair not found"}, status_code=404)
save_config(cfg) save_config(cfg)
self.config.clear() self.config.clear()
self.config.update(cfg) self.config.update(cfg)
logger.info(f"Removed trading pair: {symbol}") logger.info(f"Removed trading pair: {symbol}")
return JSONResponse(content={"status": "success", "message": f"Removed {symbol}"}) return JSONResponse(content={"status": "success", "message": f"Removed {symbol}"})
except Exception as e: except Exception as e:
logger.error(f"Error removing trading pair: {e}", exc_info=True) logger.error(f"Error removing trading pair: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -537,17 +813,26 @@ class APIRoutes:
try: try:
cfg = load_config() cfg = load_config()
enabled_indicators = cfg.setdefault('technical_indicators', {}).setdefault('enabled', []) enabled_indicators = cfg.setdefault('technical_indicators', {}).setdefault('enabled', [])
if indicator_name in enabled_indicators: if indicator_name in enabled_indicators:
enabled_indicators.remove(indicator_name) enabled_indicators.remove(indicator_name)
action = "disabled" action = "disabled"
else: else:
enabled_indicators.append(indicator_name) enabled_indicators.append(indicator_name)
action = "enabled" action = "enabled"
save_config(cfg) save_config(cfg)
self.config.clear() self.config.clear()
self.config.update(cfg) self.config.update(cfg)
logger.info(f"Indicator {indicator_name} {action}") logger.info(f"Indicator {indicator_name} {action}")
return JSONResponse(content={"status": "success", "message": f"Indicator {indicator_name} {action}", "enabled": indicator_name in enabled_indicators}) return JSONResponse(content={
"status": "success",
"message": f"Indicator {indicator_name} {action}",
"enabled": indicator_name in enabled_indicators
})
except Exception as e: except Exception as e:
logger.error(f"Error toggling indicator: {e}", exc_info=True) logger.error(f"Error toggling indicator: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -558,20 +843,26 @@ class APIRoutes:
try: try:
body = await request.json() body = await request.json()
periods = body.get('periods') periods = body.get('periods')
if periods is None: if periods is None:
return JSONResponse(content={"status": "error", "message": "Missing 'periods' in request"}, status_code=400) return JSONResponse(content={"status": "error", "message": "Missing 'periods' in request"}, status_code=400)
cfg = load_config() cfg = load_config()
periods_cfg = cfg.setdefault('technical_indicators', {}).setdefault('periods', {}) periods_cfg = cfg.setdefault('technical_indicators', {}).setdefault('periods', {})
if indicator_name not in periods_cfg: if indicator_name not in periods_cfg:
return JSONResponse(content={"status": "error", "message": f"Unknown indicator: {indicator_name}"}, status_code=404) return JSONResponse(content={"status": "error", "message": f"Unknown indicator: {indicator_name}"}, status_code=404)
periods_cfg[indicator_name] = periods periods_cfg[indicator_name] = periods
save_config(cfg) save_config(cfg)
self.config.clear() self.config.clear()
self.config.update(cfg) self.config.update(cfg)
logger.info(f"Updated {indicator_name} periods to {periods}") logger.info(f"Updated {indicator_name} periods to {periods}")
return JSONResponse(content={"status": "success", "message": f"Updated {indicator_name} periods"}) return JSONResponse(content={"status": "success", "message": f"Updated {indicator_name} periods"})
except Exception as e: except Exception as e:
logger.error(f"Error updating indicator periods: {e}", exc_info=True) logger.error(f"Error updating indicator periods: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -589,13 +880,21 @@ class APIRoutes:
return JSONResponse(content={"status": "error", "message": "Database not initialized"}, status_code=500) return JSONResponse(content={"status": "error", "message": "Database not initialized"}, status_code=500)
logger.info(f"Chart data request: symbol={request.symbol}, interval={request.interval}, limit={request.limit}") logger.info(f"Chart data request: symbol={request.symbol}, interval={request.interval}, limit={request.limit}")
data = await self.db_manager.get_recent_candles(request.symbol.upper(), request.interval, request.limit) data = await self.db_manager.get_recent_candles(request.symbol.upper(), request.interval, request.limit)
logger.info(f"Retrieved {len(data) if data else 0} candles from database") logger.info(f"Retrieved {len(data) if data else 0} candles from database")
if not data: if not data:
logger.warning(f"No data found for {request.symbol} at {request.interval}") logger.warning(f"No data found for {request.symbol} at {request.interval}")
return JSONResponse(content={"status": "error", "message": f"No data found for {request.symbol} at {request.interval}. Please start data collection or download historical data first."}, status_code=404) return JSONResponse(content={
"status": "error",
"message": f"No data found for {request.symbol} at {request.interval}. Please start data collection or download historical data first."
}, status_code=404)
logger.info(f"Returning {len(data)} candles for {request.symbol}") logger.info(f"Returning {len(data)} candles for {request.symbol}")
return JSONResponse(content={"status": "success", "data": data}) return JSONResponse(content={"status": "success", "data": data})
except Exception as e: except Exception as e:
logger.error(f"Error getting chart data: {e}", exc_info=True) logger.error(f"Error getting chart data: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": f"Error retrieving chart data: {str(e)}"}, status_code=500) return JSONResponse(content={"status": "error", "message": f"Error retrieving chart data: {str(e)}"}, status_code=500)
@@ -609,32 +908,42 @@ class APIRoutes:
start_date = datetime.fromisoformat(request.start_date) start_date = datetime.fromisoformat(request.start_date)
end_date = datetime.fromisoformat(request.end_date) if request.end_date else datetime.utcnow() end_date = datetime.fromisoformat(request.end_date) if request.end_date else datetime.utcnow()
start_date = _tz_aware(start_date) start_date = _tz_aware(start_date)
end_date = _tz_aware(end_date) end_date = _tz_aware(end_date)
intervals = request.intervals or ['1h', '4h', '1d'] intervals = request.intervals or ['1h', '4h', '1d']
results = []
results = []
for symbol in request.symbols: for symbol in request.symbols:
try: try:
symu = symbol.upper() symu = symbol.upper()
# Initialize progress for UI # Initialize progress for UI
self.data_collector.download_progress[symu] = { self.data_collector.download_progress[symu] = {
'status': 'pending', 'status': 'pending',
'intervals': {i: {'status': 'pending', 'records': 0} for i in intervals}, 'intervals': {i: {'status': 'pending', 'records': 0} for i in intervals},
'start_time': datetime.now(timezone.utc).isoformat(), 'start_time': datetime.now(timezone.utc).isoformat(),
} }
# Spawn task # Spawn task
task = asyncio.create_task( task = asyncio.create_task(
self.data_collector.bulk_download_historical_data(symu, start_date, end_date, intervals) self.data_collector.bulk_download_historical_data(symu, start_date, end_date, intervals)
) )
results.append({'symbol': symu, 'status': 'started', 'intervals': intervals}) results.append({'symbol': symu, 'status': 'started', 'intervals': intervals})
logger.info(f"Bulk download started for {symbol}") logger.info(f"Bulk download started for {symbol}")
except Exception as ie: except Exception as ie:
logger.error(f"Error starting bulk download for {symbol}: {ie}") logger.error(f"Error starting bulk download for {symbol}: {ie}")
results.append({'symbol': symu, 'status': 'error', 'error': str(ie)}) results.append({'symbol': symu, 'status': 'error', 'error': str(ie)})
return JSONResponse(content={"status": "success", "message": f"Bulk download started for {len(request.symbols)} symbol(s)", "results": results}) return JSONResponse(content={
"status": "success",
"message": f"Bulk download started for {len(request.symbols)} symbol(s)",
"results": results
})
except Exception as e: except Exception as e:
logger.error(f"Error starting bulk download: {e}", exc_info=True) logger.error(f"Error starting bulk download: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -645,8 +954,10 @@ class APIRoutes:
try: try:
if not self.data_collector: if not self.data_collector:
return JSONResponse(content={"status": "error", "message": "Data collector not initialized"}, status_code=500) return JSONResponse(content={"status": "error", "message": "Data collector not initialized"}, status_code=500)
progress = await self.data_collector.get_download_progress() progress = await self.data_collector.get_download_progress()
return JSONResponse(content={"status": "success", "downloads": serialize_for_json(progress)}) return JSONResponse(content={"status": "success", "downloads": serialize_for_json(progress)})
except Exception as e: except Exception as e:
logger.error(f"Error getting download progress: {e}", exc_info=True) logger.error(f"Error getting download progress: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -660,11 +971,14 @@ class APIRoutes:
"""Get environment variables""" """Get environment variables"""
try: try:
env_vars = dotenv_values('variables.env') or {} env_vars = dotenv_values('variables.env') or {}
safe_vars = { safe_vars = {
k: ('***' if any(s in k.upper() for s in ['SECRET', 'KEY', 'PASSWORD', 'TOKEN']) else v) k: ('***' if any(s in k.upper() for s in ['SECRET', 'KEY', 'PASSWORD', 'TOKEN']) else v)
for k, v in env_vars.items() for k, v in env_vars.items()
} }
return JSONResponse(content=safe_vars) return JSONResponse(content=safe_vars)
except Exception as e: except Exception as e:
logger.error(f"Error getting env vars: {e}", exc_info=True) logger.error(f"Error getting env vars: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@@ -675,11 +989,15 @@ class APIRoutes:
try: try:
key_upper = env_update.key.upper() key_upper = env_update.key.upper()
display_value = env_update.value if not any(s in key_upper for s in ['PASSWORD', 'SECRET', 'KEY', 'TOKEN']) else '***' display_value = env_update.value if not any(s in key_upper for s in ['PASSWORD', 'SECRET', 'KEY', 'TOKEN']) else '***'
logger.info(f"Updating env var: {env_update.key} = {display_value}") logger.info(f"Updating env var: {env_update.key} = {display_value}")
set_key('variables.env', env_update.key, env_update.value) set_key('variables.env', env_update.key, env_update.value)
reload_env_vars('variables.env') reload_env_vars('variables.env')
logger.info(f"Updated and reloaded env var: {env_update.key}") logger.info(f"Updated and reloaded env var: {env_update.key}")
return JSONResponse(content={"status": "success", "message": f"Updated {env_update.key}"}) return JSONResponse(content={"status": "success", "message": f"Updated {env_update.key}"})
except Exception as e: except Exception as e:
logger.error(f"Error updating env var: {e}", exc_info=True) logger.error(f"Error updating env var: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -694,12 +1012,17 @@ class APIRoutes:
lines = f.readlines() lines = f.readlines()
except FileNotFoundError: except FileNotFoundError:
lines = [] lines = []
new_lines = [line for line in lines if not line.startswith(f"{key}=")] new_lines = [line for line in lines if not line.startswith(f"{key}=")]
with open('variables.env', 'w', encoding='utf-8') as f: with open('variables.env', 'w', encoding='utf-8') as f:
f.writelines(new_lines) f.writelines(new_lines)
reload_env_vars('variables.env') reload_env_vars('variables.env')
logger.info(f"Deleted env var: {key}") logger.info(f"Deleted env var: {key}")
return JSONResponse(content={"status": "success", "message": f"Deleted {key}"}) return JSONResponse(content={"status": "success", "message": f"Deleted {key}"})
except Exception as e: except Exception as e:
logger.error(f"Error deleting env var: {e}", exc_info=True) logger.error(f"Error deleting env var: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)
@@ -714,8 +1037,10 @@ class APIRoutes:
try: try:
if not self.db_manager: if not self.db_manager:
raise HTTPException(status_code=500, detail="Database not initialized") raise HTTPException(status_code=500, detail="Database not initialized")
stats = await self.db_manager.get_detailed_statistics() stats = await self.db_manager.get_detailed_statistics()
return JSONResponse(content={"status": "success", "stats": serialize_for_json(stats)}) return JSONResponse(content={"status": "success", "stats": serialize_for_json(stats)})
except Exception as e: except Exception as e:
logger.error(f"Error getting database stats: {e}", exc_info=True) logger.error(f"Error getting database stats: {e}", exc_info=True)
return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500) return JSONResponse(content={"status": "error", "message": str(e)}, status_code=500)

File diff suppressed because it is too large Load Diff