2025-10-27 08:33:16 +03:00
|
|
|
|
"""
|
|
|
|
|
|
SSE (Server-Sent Events) для real-time обновлений через Redis Pub/Sub
|
|
|
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
import json
|
|
|
|
|
|
from fastapi import APIRouter, Body
|
|
|
|
|
|
from fastapi.responses import StreamingResponse
|
|
|
|
|
|
from pydantic import BaseModel
|
|
|
|
|
|
from typing import Dict, Any
|
|
|
|
|
|
from app.services.redis_service import redis_service
|
2025-11-20 18:31:42 +03:00
|
|
|
|
from app.services.database import db
|
2025-10-27 08:33:16 +03:00
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EventPublish(BaseModel):
|
|
|
|
|
|
"""Модель для публикации события"""
|
|
|
|
|
|
event_type: str = "ocr_completed"
|
|
|
|
|
|
status: str
|
|
|
|
|
|
message: str
|
|
|
|
|
|
data: Dict[str, Any] = {}
|
|
|
|
|
|
timestamp: str = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/events/{task_id}")
|
|
|
|
|
|
async def publish_event(task_id: str, event: EventPublish):
|
|
|
|
|
|
"""
|
|
|
|
|
|
Публикация события в Redis канал
|
|
|
|
|
|
|
2025-11-20 18:31:42 +03:00
|
|
|
|
Используется n8n для отправки событий (OCR, AI, wizard и т.д.)
|
2025-10-27 08:33:16 +03:00
|
|
|
|
|
|
|
|
|
|
Args:
|
2025-11-20 18:31:42 +03:00
|
|
|
|
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
|
|
|
|
|
|
Используется для формирования канала ocr_events:{session_token}
|
2025-10-27 08:33:16 +03:00
|
|
|
|
event: Данные события
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Статус публикации
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
2025-11-20 18:31:42 +03:00
|
|
|
|
# task_id на самом деле это session_token
|
2025-10-27 08:33:16 +03:00
|
|
|
|
channel = f"ocr_events:{task_id}"
|
|
|
|
|
|
event_data = {
|
|
|
|
|
|
"event_type": event.event_type,
|
|
|
|
|
|
"status": event.status,
|
|
|
|
|
|
"message": event.message,
|
|
|
|
|
|
"data": event.data,
|
|
|
|
|
|
"timestamp": event.timestamp
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# Публикуем в Redis
|
|
|
|
|
|
event_json = json.dumps(event_data, ensure_ascii=False)
|
|
|
|
|
|
await redis_service.publish(channel, event_json)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"📢 Event published to {channel}: {event.status}")
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
"success": True,
|
|
|
|
|
|
"channel": channel,
|
|
|
|
|
|
"event": event_data
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"❌ Failed to publish event: {e}")
|
|
|
|
|
|
return {
|
|
|
|
|
|
"success": False,
|
|
|
|
|
|
"error": str(e)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.get("/events/{task_id}")
|
|
|
|
|
|
async def stream_events(task_id: str):
|
|
|
|
|
|
"""
|
2025-11-20 18:31:42 +03:00
|
|
|
|
SSE стрим событий обработки OCR, AI, wizard и т.д.
|
2025-10-27 08:33:16 +03:00
|
|
|
|
|
|
|
|
|
|
Args:
|
2025-11-20 18:31:42 +03:00
|
|
|
|
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
|
|
|
|
|
|
Используется для формирования канала ocr_events:{session_token}
|
|
|
|
|
|
Фронтенд подключается через EventSource к этому эндпоинту
|
2025-10-27 08:33:16 +03:00
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
StreamingResponse с событиями
|
|
|
|
|
|
"""
|
2025-11-20 18:31:42 +03:00
|
|
|
|
logger.info(f"🚀 SSE connection requested for session_token: {task_id}")
|
2025-10-27 08:33:16 +03:00
|
|
|
|
|
|
|
|
|
|
async def event_generator():
|
|
|
|
|
|
"""Генератор событий из Redis Pub/Sub"""
|
2025-11-20 18:31:42 +03:00
|
|
|
|
# task_id на самом деле это session_token
|
2025-10-27 08:33:16 +03:00
|
|
|
|
channel = f"ocr_events:{task_id}"
|
|
|
|
|
|
|
|
|
|
|
|
# Подписываемся на канал Redis
|
2025-10-27 19:37:41 +03:00
|
|
|
|
pubsub = redis_service.client.pubsub()
|
2025-10-27 08:33:16 +03:00
|
|
|
|
await pubsub.subscribe(channel)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"📡 Client subscribed to {channel}")
|
|
|
|
|
|
|
|
|
|
|
|
# Отправляем начальное событие
|
|
|
|
|
|
yield f"data: {json.dumps({'status': 'connected', 'message': 'Подключено к событиям'})}\n\n"
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# Слушаем события
|
|
|
|
|
|
while True:
|
2025-10-27 19:37:41 +03:00
|
|
|
|
logger.info(f"⏳ Waiting for message on {channel}...")
|
2025-11-19 18:46:48 +03:00
|
|
|
|
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=60.0) # Увеличено для RAG обработки
|
2025-10-27 08:33:16 +03:00
|
|
|
|
|
2025-10-27 19:37:41 +03:00
|
|
|
|
if message:
|
|
|
|
|
|
logger.info(f"📥 Received message type: {message['type']}")
|
|
|
|
|
|
if message['type'] == 'message':
|
|
|
|
|
|
event_data = message['data'] # Уже строка (decode_responses=True)
|
|
|
|
|
|
logger.info(f"📦 Raw event data: {event_data[:200]}...")
|
|
|
|
|
|
event = json.loads(event_data)
|
|
|
|
|
|
|
|
|
|
|
|
# Обработка формата от n8n Redis ноды (вложенный)
|
|
|
|
|
|
# Формат: {"claim_id": "...", "event": {...}}
|
|
|
|
|
|
if 'event' in event and isinstance(event['event'], dict):
|
|
|
|
|
|
# Извлекаем вложенное событие
|
|
|
|
|
|
actual_event = event['event']
|
|
|
|
|
|
logger.info(f"📦 Unwrapped n8n Redis format for {task_id}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
# Формат уже плоский (от backend API или старых источников)
|
|
|
|
|
|
actual_event = event
|
|
|
|
|
|
|
2025-11-20 18:31:42 +03:00
|
|
|
|
# ✅ Обработка формата от n8n: если пришёл объект с claim_id, но без event_type
|
|
|
|
|
|
# Это значит, что n8n пушит минимальный payload для wizard_ready
|
|
|
|
|
|
logger.info(f"🔍 Checking event: has event_type={bool(actual_event.get('event_type'))}, has claim_id={bool(actual_event.get('claim_id'))}")
|
|
|
|
|
|
if not actual_event.get('event_type') and actual_event.get('claim_id'):
|
|
|
|
|
|
logger.info(f"📦 Detected minimal wizard payload (no event_type), wrapping for claim_id={actual_event.get('claim_id')}")
|
|
|
|
|
|
# Обёртываем в правильный формат
|
|
|
|
|
|
actual_event = {
|
|
|
|
|
|
'event_type': 'wizard_ready',
|
|
|
|
|
|
'status': 'ready',
|
|
|
|
|
|
'message': 'Wizard plan готов',
|
|
|
|
|
|
'data': actual_event, # Весь объект становится data
|
|
|
|
|
|
'timestamp': actual_event.get('timestamp') or None
|
|
|
|
|
|
}
|
|
|
|
|
|
logger.info(f"✅ Wrapped minimal payload into wizard_ready event")
|
|
|
|
|
|
|
|
|
|
|
|
# Обработка события wizard_ready: загружаем данные из PostgreSQL
|
|
|
|
|
|
if actual_event.get('event_type') == 'wizard_ready' and actual_event.get('data', {}).get('claim_id'):
|
|
|
|
|
|
claim_id = actual_event['data']['claim_id']
|
|
|
|
|
|
logger.info(f"🔍 Wizard ready event received, loading data for claim_id={claim_id}")
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# Загружаем данные из PostgreSQL
|
|
|
|
|
|
query = """
|
|
|
|
|
|
SELECT
|
|
|
|
|
|
id,
|
|
|
|
|
|
payload->>'claim_id' as claim_id,
|
|
|
|
|
|
session_token,
|
|
|
|
|
|
unified_id,
|
|
|
|
|
|
status_code,
|
|
|
|
|
|
channel,
|
|
|
|
|
|
payload,
|
|
|
|
|
|
created_at,
|
|
|
|
|
|
updated_at
|
|
|
|
|
|
FROM clpr_claims
|
|
|
|
|
|
WHERE (payload->>'claim_id' = $1 OR id::text = $1)
|
|
|
|
|
|
LIMIT 1
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
row = await db.fetch_one(query, claim_id)
|
|
|
|
|
|
|
|
|
|
|
|
if row:
|
|
|
|
|
|
# Обрабатываем payload - может быть строкой (JSONB) или уже dict
|
|
|
|
|
|
payload_raw = row.get('payload')
|
|
|
|
|
|
if isinstance(payload_raw, str):
|
|
|
|
|
|
try:
|
|
|
|
|
|
payload = json.loads(payload_raw) if payload_raw else {}
|
|
|
|
|
|
except (json.JSONDecodeError, TypeError):
|
|
|
|
|
|
payload = {}
|
|
|
|
|
|
elif isinstance(payload_raw, dict):
|
|
|
|
|
|
payload = payload_raw
|
|
|
|
|
|
else:
|
|
|
|
|
|
payload = {}
|
|
|
|
|
|
|
|
|
|
|
|
# Извлекаем claim_id из payload, если его нет в row
|
|
|
|
|
|
claim_id_from_payload = payload.get('claim_id') if isinstance(payload, dict) else None
|
|
|
|
|
|
final_claim_id = row.get('claim_id') or claim_id_from_payload or str(row['id'])
|
|
|
|
|
|
|
|
|
|
|
|
# Обогащаем событие полными данными из PostgreSQL
|
|
|
|
|
|
# Добавляем данные и в data, и в корень для совместимости с фронтендом
|
|
|
|
|
|
actual_event['data'] = {
|
|
|
|
|
|
**actual_event.get('data', {}),
|
|
|
|
|
|
'wizard_plan': payload.get('wizard_plan'),
|
|
|
|
|
|
'problem_description': payload.get('problem_description'),
|
|
|
|
|
|
'wizard_answers': payload.get('answers'),
|
|
|
|
|
|
'answers_prefill': payload.get('answers_prefill'),
|
|
|
|
|
|
'documents_meta': payload.get('documents_meta', []),
|
|
|
|
|
|
'ai_agent1_facts': payload.get('ai_agent1_facts'),
|
|
|
|
|
|
'ai_agent13_rag': payload.get('ai_agent13_rag'),
|
|
|
|
|
|
'coverage_report': payload.get('coverage_report'),
|
|
|
|
|
|
'phone': payload.get('phone'),
|
|
|
|
|
|
'email': payload.get('email'),
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
# Также добавляем wizard_plan в корень для совместимости с фронтендом
|
|
|
|
|
|
actual_event['wizard_plan'] = payload.get('wizard_plan')
|
|
|
|
|
|
actual_event['answers_prefill'] = payload.get('answers_prefill')
|
|
|
|
|
|
actual_event['coverage_report'] = payload.get('coverage_report')
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"✅ Wizard data loaded from PostgreSQL for claim_id={final_claim_id}, has_wizard_plan={payload.get('wizard_plan') is not None}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.warning(f"⚠️ Claim not found in PostgreSQL: claim_id={claim_id}")
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"❌ Error loading wizard data from PostgreSQL: {e}")
|
|
|
|
|
|
|
2025-10-27 19:37:41 +03:00
|
|
|
|
# Отправляем событие клиенту (плоский формат)
|
|
|
|
|
|
event_json = json.dumps(actual_event, ensure_ascii=False)
|
|
|
|
|
|
logger.info(f"📤 Sending event to client: {actual_event.get('status', 'unknown')}")
|
|
|
|
|
|
yield f"data: {event_json}\n\n"
|
|
|
|
|
|
|
|
|
|
|
|
# Если обработка завершена - закрываем соединение
|
|
|
|
|
|
if actual_event.get('status') in ['completed', 'error', 'success']:
|
|
|
|
|
|
logger.info(f"✅ Task {task_id} finished, closing SSE")
|
|
|
|
|
|
break
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.info(f"⏰ Timeout waiting for message on {channel}")
|
2025-10-27 08:33:16 +03:00
|
|
|
|
|
|
|
|
|
|
# Пинг каждые 30 сек чтобы соединение не закрылось
|
|
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
|
logger.info(f"❌ Client disconnected from {channel}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
await pubsub.unsubscribe(channel)
|
|
|
|
|
|
await pubsub.close()
|
|
|
|
|
|
|
|
|
|
|
|
return StreamingResponse(
|
|
|
|
|
|
event_generator(),
|
|
|
|
|
|
media_type="text/event-stream",
|
|
|
|
|
|
headers={
|
|
|
|
|
|
"Cache-Control": "no-cache",
|
|
|
|
|
|
"Connection": "keep-alive",
|
|
|
|
|
|
"X-Accel-Buffering": "no" # Отключаем буферизацию nginx
|
|
|
|
|
|
}
|
|
|
|
|
|
)
|
|
|
|
|
|
|