""" SSE (Server-Sent Events) для real-time обновлений через Redis Pub/Sub """ import asyncio import json from fastapi import APIRouter, Body from fastapi.responses import StreamingResponse from pydantic import BaseModel from typing import Dict, Any from app.services.redis_service import redis_service import logging logger = logging.getLogger(__name__) router = APIRouter() class EventPublish(BaseModel): """Модель для публикации события""" event_type: str = "ocr_completed" status: str message: str data: Dict[str, Any] = {} timestamp: str = None @router.post("/events/{task_id}") async def publish_event(task_id: str, event: EventPublish): """ Публикация события в Redis канал Используется n8n для отправки событий (OCR, AI и т.д.) Args: task_id: ID задачи event: Данные события Returns: Статус публикации """ try: channel = f"ocr_events:{task_id}" event_data = { "event_type": event.event_type, "status": event.status, "message": event.message, "data": event.data, "timestamp": event.timestamp } # Публикуем в Redis event_json = json.dumps(event_data, ensure_ascii=False) await redis_service.publish(channel, event_json) logger.info(f"📢 Event published to {channel}: {event.status}") return { "success": True, "channel": channel, "event": event_data } except Exception as e: logger.error(f"❌ Failed to publish event: {e}") return { "success": False, "error": str(e) } @router.get("/events/{task_id}") async def stream_events(task_id: str): """ SSE стрим событий обработки OCR Args: task_id: ID задачи Returns: StreamingResponse с событиями """ async def event_generator(): """Генератор событий из Redis Pub/Sub""" channel = f"ocr_events:{task_id}" # Подписываемся на канал Redis pubsub = redis_service.redis.pubsub() await pubsub.subscribe(channel) logger.info(f"📡 Client subscribed to {channel}") # Отправляем начальное событие yield f"data: {json.dumps({'status': 'connected', 'message': 'Подключено к событиям'})}\n\n" try: # Слушаем события while True: message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=30.0) if message and message['type'] == 'message': event_data = message['data'].decode('utf-8') # Отправляем событие клиенту yield f"data: {event_data}\n\n" # Если обработка завершена - закрываем соединение event = json.loads(event_data) if event.get('status') in ['completed', 'error']: logger.info(f"✅ Task {task_id} finished, closing SSE") break # Пинг каждые 30 сек чтобы соединение не закрылось await asyncio.sleep(0.1) except asyncio.CancelledError: logger.info(f"❌ Client disconnected from {channel}") finally: await pubsub.unsubscribe(channel) await pubsub.close() return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Отключаем буферизацию nginx } )