Files
Market-Data-Downloader/utils.py
2025-10-05 13:10:12 +01:00

855 lines
34 KiB
Python

#!/usr/bin/env python3
"""
utils.py - Utility Functions for Data Processing and Technical Indicators
Utility functions for data processing, technical indicators, validation, and configuration management
"""
import json
import logging
import os
import re
import tempfile
import shutil
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any, Union
import pandas as pd
import pandas_ta as ta
import numpy as np
from decimal import Decimal, ROUND_HALF_UP, InvalidOperation as DecimalException
from dotenv import load_dotenv
# Load environment variables
load_dotenv('variables.env')
def setup_logging(log_level: str = None, log_file: str = None):
"""Setup logging configuration"""
# Use environment variables if parameters not provided
if log_level is None:
log_level = os.getenv('LOG_LEVEL', 'INFO')
if log_file is None:
log_file = os.getenv('LOG_FILE', 'crypto_collector.log')
# Create logs directory if it doesn't exist
os.makedirs("logs", exist_ok=True)
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"
# Configure root logger
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format=log_format,
datefmt=date_format,
handlers=[
logging.FileHandler(f"logs/{log_file}"),
logging.StreamHandler()
]
)
# Set specific log levels for external libraries
logging.getLogger("websockets").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("binance").setLevel(logging.WARNING)
def load_config(config_file: str = "config.conf") -> Dict[str, Any]:
"""Load configuration from JSON file"""
logger = logging.getLogger(__name__)
try:
with open(config_file, 'r') as f:
config = json.load(f)
# Validate configuration structure
validate_config(config)
logger.debug(f"Successfully loaded config from {config_file}")
return config
except FileNotFoundError:
logger.warning(f"Config file {config_file} not found, creating default")
# Create default configuration if file doesn't exist
default_config = create_default_config()
save_config(default_config, config_file)
return default_config
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in configuration file: {e}")
raise ValueError(f"Invalid JSON in configuration file: {e}")
def create_default_config() -> Dict[str, Any]:
"""Create default configuration"""
return {
"trading_pairs": [
{"symbol": "BTCUSDT", "enabled": True, "priority": 1},
{"symbol": "ETHUSDT", "enabled": True, "priority": 1},
{"symbol": "BNBUSDT", "enabled": True, "priority": 2},
{"symbol": "XRPUSDT", "enabled": True, "priority": 3},
{"symbol": "SOLUSDT", "enabled": True, "priority": 2}
],
"technical_indicators": {
"enabled": ["sma", "ema", "rsi", "macd", "bb", "atr"],
"periods": {
"sma": [20, 50, 200],
"ema": [12, 26],
"rsi": [14],
"macd": {"fast": 12, "slow": 26, "signal": 9},
"bb": {"period": 20, "std": 2},
"atr": [14],
"stoch": {"k_period": 14, "d_period": 3},
"adx": [14]
},
"calculation_intervals": ["1m", "5m", "15m", "1h", "4h", "1d"]
},
"collection": {
"bulk_chunk_size": 1000,
"websocket_reconnect_delay": 5,
"tick_batch_size": 100,
"candle_intervals": ["1m", "5m", "15m", "1h", "4h", "1d"],
"max_retries": 3,
"retry_delay": 1,
"rate_limit_requests_per_minute": 2000,
"concurrent_symbol_limit": 10
},
"database": {
"batch_insert_size": 1000,
"compression_after_days": 7,
"retention_policy_days": 365,
"vacuum_analyze_interval_hours": 24,
"connection_pool": {
"min_size": 10,
"max_size": 50,
"command_timeout": 60
}
},
"ui": {
"refresh_interval_seconds": 5,
"max_chart_points": 1000,
"default_timeframe": "1d",
"theme": "dark",
"enable_realtime_updates": True
},
"gap_filling": {
"enable_auto_gap_filling": True,
"auto_fill_schedule_hours": 24,
"intervals_to_monitor": ["1m", "5m", "15m", "1h", "4h", "1d"],
"max_gap_size_candles": 1000,
"max_consecutive_empty_candles": 5,
"averaging_lookback_candles": 10,
"enable_intelligent_averaging": True,
"max_fill_attempts": 3
},
}
def save_config(config: Dict[str, Any], config_file: str = "config.conf"):
"""Save configuration to JSON file using atomic write"""
logger = logging.getLogger(__name__)
try:
# Validate before saving
validate_config(config)
# Get the directory of the config file
config_dir = os.path.dirname(config_file) or '.'
# Create a temporary file in the same directory
temp_fd, temp_path = tempfile.mkstemp(
dir=config_dir,
prefix='.tmp_config_',
suffix='.conf',
text=True
)
try:
# Write to temporary file
with os.fdopen(temp_fd, 'w') as f:
json.dump(config, f, indent=2, sort_keys=False)
f.flush()
os.fsync(f.fileno()) # Force write to disk
# Atomic rename
shutil.move(temp_path, config_file)
logger.info(f"Configuration saved successfully to {config_file}")
except Exception as e:
# Clean up temp file on error
try:
os.unlink(temp_path)
except:
pass
raise
except Exception as e:
logger.error(f"Error saving config: {e}", exc_info=True)
raise
def validate_config(config: Dict[str, Any]):
"""Validate configuration structure"""
required_sections = ["trading_pairs", "technical_indicators", "collection", "database"]
for section in required_sections:
if section not in config:
raise ValueError(f"Missing required configuration section: {section}")
# Validate trading pairs
if not isinstance(config["trading_pairs"], list):
raise ValueError("trading_pairs must be a list")
for pair in config["trading_pairs"]:
if not isinstance(pair, dict) or "symbol" not in pair:
raise ValueError("Invalid trading pair configuration")
if not validate_symbol(pair["symbol"]):
raise ValueError(f"Invalid symbol format: {pair['symbol']}")
# Ensure required fields with defaults
if "enabled" not in pair:
pair["enabled"] = True
if "priority" not in pair:
pair["priority"] = 1
# Validate technical indicators
indicators_config = config["technical_indicators"]
if "enabled" not in indicators_config or "periods" not in indicators_config:
raise ValueError("Invalid technical indicators configuration")
if not isinstance(indicators_config["enabled"], list):
raise ValueError("technical_indicators.enabled must be a list")
def validate_symbol(symbol: str) -> bool:
"""Validate trading pair symbol format"""
# Binance symbol format: base currency + quote currency (e.g., BTCUSDT)
if not symbol or len(symbol) < 6:
return False
# Should be uppercase letters/numbers only
if not re.match(r'^[A-Z0-9]+$', symbol):
return False
# Should end with common quote currencies
quote_currencies = ['USDT', 'BUSD', 'BTC', 'ETH', 'BNB', 'USDC', 'TUSD', 'DAI']
if not any(symbol.endswith(quote) for quote in quote_currencies):
return False
return True
def reload_env_vars(env_file: str = 'variables.env'):
"""Reload environment variables from file"""
from dotenv import load_dotenv
load_dotenv(env_file, override=True)
def format_timestamp(timestamp: Union[int, float, str, datetime]) -> datetime:
"""Format timestamp to datetime object"""
if isinstance(timestamp, datetime):
# Ensure timezone awareness
if timestamp.tzinfo is None:
return timestamp.replace(tzinfo=timezone.utc)
return timestamp
if isinstance(timestamp, str):
try:
# Try parsing ISO format first
return datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
try:
# Try parsing as timestamp
timestamp = float(timestamp)
except ValueError:
raise ValueError(f"Invalid timestamp string format: {timestamp}")
if isinstance(timestamp, (int, float)):
# Handle both seconds and milliseconds timestamps
if timestamp > 1e10: # Milliseconds
timestamp = timestamp / 1000
return datetime.fromtimestamp(timestamp, tz=timezone.utc)
raise ValueError(f"Invalid timestamp format: {type(timestamp)}")
def parse_kline_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Binance kline/candlestick data"""
kline = data['k']
return {
'time': format_timestamp(kline['t']),
'symbol': kline['s'],
'exchange': 'binance',
'interval': kline['i'],
'open_price': Decimal(str(kline['o'])),
'high_price': Decimal(str(kline['h'])),
'low_price': Decimal(str(kline['l'])),
'close_price': Decimal(str(kline['c'])),
'volume': Decimal(str(kline['v'])),
'quote_volume': Decimal(str(kline['q'])) if 'q' in kline else None,
'trade_count': int(kline['n']) if 'n' in kline else None
}
def parse_trade_data(data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Binance trade data"""
return {
'time': format_timestamp(data['T']),
'symbol': data['s'],
'exchange': 'binance',
'price': Decimal(str(data['p'])),
'quantity': Decimal(str(data['q'])),
'trade_id': int(data['t']),
'is_buyer_maker': bool(data['m'])
}
def calculate_technical_indicators(df: pd.DataFrame, indicators_config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Calculate technical indicators using pandas_ta
Args:
df: DataFrame with OHLCV data (index: time, columns: open, high, low, close, volume)
indicators_config: Configuration for indicators to calculate
Returns:
List of dictionaries with indicator data
"""
if len(df) < 50: # Need enough data for most indicators
return []
# Create a copy and ensure proper data types
df_ta = df.copy()
# Rename columns to match pandas_ta expectations if needed
column_mapping = {
'open_price': 'open',
'high_price': 'high',
'low_price': 'low',
'close_price': 'close'
}
for old_col, new_col in column_mapping.items():
if old_col in df_ta.columns and new_col not in df_ta.columns:
df_ta.rename(columns={old_col: new_col}, inplace=True)
# **CRITICAL FIX**: Convert all columns to float64 to avoid numba pyobject errors
# This ensures pandas_ta's numba-compiled functions receive proper numeric types
required_columns = ['open', 'high', 'low', 'close', 'volume']
for col in required_columns:
if col in df_ta.columns:
df_ta[col] = pd.to_numeric(df_ta[col], errors='coerce').astype(np.float64)
# Remove any NaN values that may have been introduced
df_ta = df_ta.dropna()
if len(df_ta) < 50: # Check again after cleaning
return []
indicators_data = []
enabled_indicators = indicators_config.get('enabled', [])
periods = indicators_config.get('periods', {})
logger = logging.getLogger(__name__)
try:
for indicator in enabled_indicators:
if indicator == 'sma':
# Simple Moving Average
for period in periods.get('sma', [20]):
try:
sma_values = ta.sma(df_ta['close'], length=period)
if sma_values is not None:
for idx, value in sma_values.dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': f'sma_{period}',
'indicator_value': round(float(value), 8),
'metadata': json.dumps({'period': period})
})
except Exception as e:
logger.error(f"Error calculating SMA-{period}: {e}")
elif indicator == 'ema':
# Exponential Moving Average
for period in periods.get('ema', [12, 26]):
try:
ema_values = ta.ema(df_ta['close'], length=period)
if ema_values is not None:
for idx, value in ema_values.dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': f'ema_{period}',
'indicator_value': round(float(value), 8),
'metadata': json.dumps({'period': period})
})
except Exception as e:
logger.error(f"Error calculating EMA-{period}: {e}")
elif indicator == 'rsi':
# Relative Strength Index
for period in periods.get('rsi', [14]):
try:
rsi_values = ta.rsi(df_ta['close'], length=period)
if rsi_values is not None:
for idx, value in rsi_values.dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': f'rsi_{period}',
'indicator_value': round(float(value), 8),
'metadata': json.dumps({'period': period})
})
except Exception as e:
logger.error(f"Error calculating RSI-{period}: {e}")
elif indicator == 'macd':
# MACD
macd_config = periods.get('macd', {'fast': 12, 'slow': 26, 'signal': 9})
try:
macd_result = ta.macd(
df_ta['close'],
fast=macd_config['fast'],
slow=macd_config['slow'],
signal=macd_config['signal']
)
if macd_result is not None:
# MACD Line
macd_col = f"MACD_{macd_config['fast']}_{macd_config['slow']}_{macd_config['signal']}"
if macd_col in macd_result.columns:
for idx, value in macd_result[macd_col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'macd_line',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(macd_config)
})
# MACD Signal
signal_col = f"MACDs_{macd_config['fast']}_{macd_config['slow']}_{macd_config['signal']}"
if signal_col in macd_result.columns:
for idx, value in macd_result[signal_col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'macd_signal',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(macd_config)
})
# MACD Histogram
hist_col = f"MACDh_{macd_config['fast']}_{macd_config['slow']}_{macd_config['signal']}"
if hist_col in macd_result.columns:
for idx, value in macd_result[hist_col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'macd_histogram',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(macd_config)
})
except Exception as e:
logger.error(f"Error calculating MACD: {e}")
elif indicator == 'bb':
# Bollinger Bands
bb_config = periods.get('bb', {'period': 20, 'std': 2})
try:
bb_result = ta.bbands(
df_ta['close'],
length=bb_config['period'],
std=bb_config['std']
)
if bb_result is not None:
# Upper Band
for col in bb_result.columns:
if col.startswith(f"BBU_{bb_config['period']}"):
for idx, value in bb_result[col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'bb_upper',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(bb_config)
})
break
# Middle Band
for col in bb_result.columns:
if col.startswith(f"BBM_{bb_config['period']}"):
for idx, value in bb_result[col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'bb_middle',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(bb_config)
})
break
# Lower Band
for col in bb_result.columns:
if col.startswith(f"BBL_{bb_config['period']}"):
for idx, value in bb_result[col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'bb_lower',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(bb_config)
})
break
except Exception as e:
logger.error(f"Error calculating Bollinger Bands: {e}")
elif indicator == 'atr':
# Average True Range
for period in periods.get('atr', [14]):
try:
atr_values = ta.atr(df_ta['high'], df_ta['low'], df_ta['close'], length=period)
if atr_values is not None:
for idx, value in atr_values.dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': f'atr_{period}',
'indicator_value': round(float(value), 8),
'metadata': json.dumps({'period': period})
})
except Exception as e:
logger.error(f"Error calculating ATR-{period}: {e}")
elif indicator == 'stoch':
# Stochastic Oscillator
stoch_config = periods.get('stoch', {'k_period': 14, 'd_period': 3})
try:
stoch_result = ta.stoch(
df_ta['high'], df_ta['low'], df_ta['close'],
k=stoch_config['k_period'],
d=stoch_config['d_period']
)
if stoch_result is not None:
# %K
for col in stoch_result.columns:
if 'STOCHk' in col:
for idx, value in stoch_result[col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'stoch_k',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(stoch_config)
})
break
# %D
for col in stoch_result.columns:
if 'STOCHd' in col:
for idx, value in stoch_result[col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': 'stoch_d',
'indicator_value': round(float(value), 8),
'metadata': json.dumps(stoch_config)
})
break
except Exception as e:
logger.error(f"Error calculating Stochastic: {e}")
elif indicator == 'adx':
# Average Directional Index
for period in periods.get('adx', [14]):
try:
adx_result = ta.adx(df_ta['high'], df_ta['low'], df_ta['close'], length=period)
if adx_result is not None:
adx_col = f"ADX_{period}"
if adx_col in adx_result.columns:
for idx, value in adx_result[adx_col].dropna().items():
indicators_data.append({
'time': idx,
'indicator_name': f'adx_{period}',
'indicator_value': round(float(value), 8),
'metadata': json.dumps({'period': period})
})
except Exception as e:
logger.error(f"Error calculating ADX-{period}: {e}")
except Exception as e:
logger.error(f"Error calculating technical indicators: {e}", exc_info=True)
return indicators_data
def resample_ticks_to_ohlcv(ticks: List[Dict[str, Any]], interval: str) -> List[Dict[str, Any]]:
"""
Resample tick data to OHLCV format
Args:
ticks: List of tick data dictionaries
interval: Resampling interval (e.g., '1min', '5min', '1H')
Returns:
List of OHLCV dictionaries
"""
if not ticks:
return []
# Convert to DataFrame
df = pd.DataFrame(ticks)
df['time'] = pd.to_datetime(df['time'])
df.set_index('time', inplace=True)
# Convert price and quantity to float
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df['quantity'] = pd.to_numeric(df['quantity'], errors='coerce')
# Group by symbol and resample
ohlcv_data = []
for symbol in df['symbol'].unique():
symbol_df = df[df['symbol'] == symbol].copy()
# Resample price data
ohlcv = symbol_df['price'].resample(interval).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
})
# Resample volume and trade count
volume = symbol_df['quantity'].resample(interval).sum()
trade_count = symbol_df.resample(interval).size()
# Combine data
for timestamp, row in ohlcv.iterrows():
if pd.notna(row['open']): # Skip empty periods
ohlcv_data.append({
'time': timestamp,
'symbol': symbol,
'exchange': symbol_df['exchange'].iloc[0] if 'exchange' in symbol_df.columns else 'binance',
'interval': interval,
'open_price': Decimal(str(row['open'])).quantize(Decimal('0.00000001'), rounding=ROUND_HALF_UP),
'high_price': Decimal(str(row['high'])).quantize(Decimal('0.00000001'), rounding=ROUND_HALF_UP),
'low_price': Decimal(str(row['low'])).quantize(Decimal('0.00000001'), rounding=ROUND_HALF_UP),
'close_price': Decimal(str(row['close'])).quantize(Decimal('0.00000001'), rounding=ROUND_HALF_UP),
'volume': Decimal(str(volume.loc[timestamp])) if timestamp in volume.index else Decimal('0'),
'quote_volume': None,
'trade_count': int(trade_count.loc[timestamp]) if timestamp in trade_count.index else 0
})
return ohlcv_data
def validate_ohlcv_data(ohlcv: Dict[str, Any]) -> bool:
"""Validate OHLCV data integrity"""
try:
# Check required fields
required_fields = ['time', 'symbol', 'open_price', 'high_price', 'low_price', 'close_price', 'volume']
for field in required_fields:
if field not in ohlcv:
return False
# Check price relationships
high = float(ohlcv['high_price'])
low = float(ohlcv['low_price'])
open_price = float(ohlcv['open_price'])
close = float(ohlcv['close_price'])
# High should be >= all other prices
if high < max(low, open_price, close):
return False
# Low should be <= all other prices
if low > min(high, open_price, close):
return False
# All prices should be positive
if any(price <= 0 for price in [high, low, open_price, close]):
return False
# Volume should be non-negative
if float(ohlcv['volume']) < 0:
return False
return True
except (ValueError, TypeError, KeyError):
return False
def calculate_price_change(current_price: float, previous_price: float) -> Dict[str, float]:
"""Calculate price change and percentage change"""
if previous_price == 0:
return {'change': 0.0, 'change_percent': 0.0}
change = current_price - previous_price
change_percent = (change / previous_price) * 100
return {
'change': round(change, 8),
'change_percent': round(change_percent, 4)
}
def format_volume(volume: Union[int, float, Decimal]) -> str:
"""Format volume for display"""
volume = float(volume)
if volume >= 1e9:
return f"{volume / 1e9:.2f}B"
elif volume >= 1e6:
return f"{volume / 1e6:.2f}M"
elif volume >= 1e3:
return f"{volume / 1e3:.2f}K"
else:
return f"{volume:.2f}"
def get_interval_seconds(interval: str) -> int:
"""Convert interval string to seconds"""
interval_map = {
'1s': 1,
'1m': 60,
'3m': 180,
'5m': 300,
'15m': 900,
'30m': 1800,
'1h': 3600,
'2h': 7200,
'4h': 14400,
'6h': 21600,
'8h': 28800,
'12h': 43200,
'1d': 86400,
'3d': 259200,
'1w': 604800,
'1M': 2592000 # Approximate
}
return interval_map.get(interval, 60) # Default to 1 minute
def safe_decimal_conversion(value: Any) -> Optional[Decimal]:
"""Safely convert value to Decimal"""
try:
if value is None or value == '':
return None
return Decimal(str(value)).quantize(Decimal('0.00000001'), rounding=ROUND_HALF_UP)
except (ValueError, TypeError, DecimalException):
return None
def batch_data(data: List[Any], batch_size: int) -> List[List[Any]]:
"""Split data into batches"""
batches = []
for i in range(0, len(data), batch_size):
batches.append(data[i:i + batch_size])
return batches
def get_binance_symbol_info(symbol: str) -> Dict[str, Any]:
"""Get symbol information for validation"""
# This is a simplified version - in production you might want to fetch from Binance API
common_symbols = {
'BTCUSDT': {'baseAsset': 'BTC', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'ETHUSDT': {'baseAsset': 'ETH', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'BNBUSDT': {'baseAsset': 'BNB', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'XRPUSDT': {'baseAsset': 'XRP', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'SOLUSDT': {'baseAsset': 'SOL', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'ADAUSDT': {'baseAsset': 'ADA', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'DOTUSDT': {'baseAsset': 'DOT', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'LINKUSDT': {'baseAsset': 'LINK', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'LTCUSDT': {'baseAsset': 'LTC', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'HBARUSDT': {'baseAsset': 'HBAR', 'quoteAsset': 'USDT', 'status': 'TRADING'},
'HBARBTC': {'baseAsset': 'HBAR', 'quoteAsset': 'BTC', 'status': 'TRADING'}
}
return common_symbols.get(symbol, {'status': 'UNKNOWN'})
class DataValidator:
"""Class for validating trading data"""
@staticmethod
def validate_tick_data(tick: Dict[str, Any]) -> bool:
"""Validate tick/trade data"""
try:
required_fields = ['time', 'symbol', 'price', 'quantity', 'trade_id']
for field in required_fields:
if field not in tick:
return False
# Validate data types and ranges
if float(tick['price']) <= 0:
return False
if float(tick['quantity']) <= 0:
return False
if not isinstance(tick['trade_id'], (int, str)):
return False
if not validate_symbol(tick['symbol']):
return False
return True
except (ValueError, TypeError):
return False
@staticmethod
def validate_indicators_data(indicators: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Validate and clean indicators data"""
valid_indicators = []
for indicator in indicators:
try:
if ('time' in indicator and
'indicator_name' in indicator and
'indicator_value' in indicator):
# Check for valid numeric value
value = float(indicator['indicator_value'])
if not (np.isnan(value) or np.isinf(value)):
valid_indicators.append(indicator)
except (ValueError, TypeError):
continue
return valid_indicators
def create_error_response(error_message: str, error_code: str = "GENERAL_ERROR") -> Dict[str, Any]:
"""Create standardized error response"""
return {
"success": False,
"error": {
"code": error_code,
"message": error_message,
"timestamp": datetime.utcnow().isoformat()
}
}
def create_success_response(data: Any = None, message: str = "Success") -> Dict[str, Any]:
"""Create standardized success response"""
response = {
"success": True,
"message": message,
"timestamp": datetime.utcnow().isoformat()
}
if data is not None:
response["data"] = data
return response
class PerformanceTimer:
"""Context manager for timing operations"""
def __init__(self, operation_name: str):
self.operation_name = operation_name
self.start_time = None
self.logger = logging.getLogger(__name__)
def __enter__(self):
self.start_time = datetime.utcnow()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.start_time:
duration = (datetime.utcnow() - self.start_time).total_seconds()
# Log slow operations
slow_threshold = float(os.getenv('SLOW_QUERY_THRESHOLD_MS', 1000)) / 1000
if duration > slow_threshold:
self.logger.warning(f"SLOW OPERATION: {self.operation_name} took {duration:.3f}s")
else:
self.logger.debug(f"{self.operation_name} completed in {duration:.3f}s")
# Export main functions
__all__ = [
'setup_logging', 'load_config', 'save_config', 'validate_config',
'create_default_config', 'validate_symbol', 'format_timestamp',
'parse_kline_data', 'parse_trade_data', 'calculate_technical_indicators',
'resample_ticks_to_ohlcv', 'validate_ohlcv_data', 'calculate_price_change',
'format_volume', 'get_interval_seconds', 'safe_decimal_conversion',
'batch_data', 'get_binance_symbol_info', 'DataValidator',
'create_error_response', 'create_success_response', 'PerformanceTimer',
'reload_env_vars'
]