#!/usr/bin/env python3 """ ui_websocket.py - WebSocket Connections and Real-time Updates Handles WebSocket connections and broadcasts real-time status updates """ import asyncio import logging from typing import List from fastapi import WebSocket, WebSocketDisconnect logger = logging.getLogger(__name__) # Global WebSocket connection pool websocket_connections: List[WebSocket] = [] async def broadcast_to_websockets(message: dict): """Send message to all connected WebSocket clients""" disconnected = [] for ws in websocket_connections: try: await ws.send_json(message) except Exception: disconnected.append(ws) # Remove disconnected clients for ws in disconnected: if ws in websocket_connections: websocket_connections.remove(ws) async def broadcast_status_updates(get_status_func): """Background task to broadcast status updates to all WebSocket clients""" while True: try: await asyncio.sleep(2) # Broadcast every 2 seconds if websocket_connections: status = await get_status_func() await broadcast_to_websockets({ "type": "status_update", "data": status }) except Exception as e: logger.error(f"Error in broadcast task: {e}") async def handle_websocket_connection(websocket: WebSocket): """Handle individual WebSocket connection""" await websocket.accept() websocket_connections.append(websocket) logger.info(f"WebSocket connected. Total connections: {len(websocket_connections)}") try: while True: # Keep connection alive and handle incoming messages data = await websocket.receive_text() logger.debug(f"Received WebSocket message: {data}") # Echo or handle specific commands if needed # await websocket.send_json({"type": "ack", "message": "received"}) except WebSocketDisconnect: logger.info("WebSocket disconnected normally") except Exception as e: logger.error(f"WebSocket error: {e}") finally: if websocket in websocket_connections: websocket_connections.remove(websocket) logger.info(f"WebSocket removed. Total connections: {len(websocket_connections)}")