2025-10-27 08:33:16 +03:00
"""
SSE ( Server - Sent Events ) для real - time обновлений через Redis Pub / Sub
"""
import asyncio
import json
from fastapi import APIRouter , Body
from fastapi . responses import StreamingResponse
from pydantic import BaseModel
from typing import Dict , Any
from app . services . redis_service import redis_service
2025-11-20 18:31:42 +03:00
from app . services . database import db
2026-02-24 16:17:59 +03:00
from app . config import settings
2025-10-27 08:33:16 +03:00
import logging
logger = logging . getLogger ( __name__ )
2025-12-04 12:22:23 +03:00
router = APIRouter ( prefix = " /api/v1 " , tags = [ " Events " ] )
2025-10-27 08:33:16 +03:00
2026-02-24 16:17:59 +03:00
# Типы для единого отображения на фронте: тип + текст (+ data для consumer_complaint)
DISPLAY_EVENT_TYPES = ( " trash_message " , " out_of_scope " , " consumer_consultation " , " consumer_complaint " )
def _normalize_display_event ( actual_event : dict ) - > dict :
"""
Приводит событие к формату { event_type , message [ , data ] } для единого отображения .
event_type — один из : trash_message ( красный ) , out_of_scope ( жёлтый ) ,
consumer_consultation ( синий ) , consumer_complaint ( зелёный ) .
"""
raw_type = actual_event . get ( " event_type " ) or actual_event . get ( " type " )
payload = actual_event . get ( " payload " ) or actual_event . get ( " data " ) or { }
if isinstance ( payload , str ) :
try :
payload = json . loads ( payload ) if payload else { }
except Exception :
payload = { }
if not isinstance ( payload , dict ) :
payload = { }
msg = ( actual_event . get ( " message " ) or payload . get ( " message " ) or " " ) . strip ( ) or " Ответ получен "
# Если n8n уже прислал один из четырёх типов — не перезаписываем, отдаём как есть (синий/зелёный не превращаем в жёлтый)
if raw_type in DISPLAY_EVENT_TYPES :
return {
" event_type " : raw_type ,
" message " : msg or " Ответ получен " ,
" data " : actual_event . get ( " data " , { } ) ,
" suggested_actions " : ( actual_event . get ( " suggested_actions " ) or payload . get ( " suggested_actions " ) ) if raw_type == " out_of_scope " else None ,
}
if raw_type == " trash_message " or payload . get ( " intent " ) == " trash " :
return {
" event_type " : " trash_message " ,
" message " : msg or " К сожалению, это обращение не по тематике." ,
" data " : actual_event . get ( " data " , { } ) ,
}
if raw_type == " out_of_scope " :
return {
" event_type " : " out_of_scope " ,
" message " : msg or " К сожалению, мы не можем помочь с этим вопросом." ,
" data " : actual_event . get ( " data " , { } ) ,
" suggested_actions " : actual_event . get ( " suggested_actions " ) or payload . get ( " suggested_actions " ) ,
}
if raw_type == " consumer_intent " :
intent = payload . get ( " intent " ) or actual_event . get ( " intent " )
if intent == " consultation " :
return {
" event_type " : " consumer_consultation " ,
" message " : msg or " Понял. Это похоже на консультацию. " ,
" data " : { } ,
}
return {
" event_type " : " consumer_complaint " ,
" message " : msg or " Обращение принято. " ,
" data " : actual_event . get ( " data " , { } ) ,
}
if raw_type == " documents_list_ready " :
return {
" event_type " : " consumer_complaint " ,
" message " : msg or " Подготовлен список документов. " ,
" data " : {
* * actual_event . get ( " data " , { } ) ,
" documents_required " : actual_event . get ( " documents_required " ) ,
" claim_id " : actual_event . get ( " claim_id " ) ,
} ,
}
if raw_type in ( " wizard_ready " , " wizard_plan_ready " , " claim_plan_ready " ) :
return {
" event_type " : " consumer_complaint " ,
" message " : msg or " План готов. " ,
" data " : actual_event . get ( " data " , actual_event ) ,
}
if raw_type == " ocr_status " and actual_event . get ( " status " ) == " ready " :
return {
" event_type " : " consumer_complaint " ,
" message " : msg or " Данные подтверждены. " ,
" data " : actual_event . get ( " data " , { } ) ,
}
# Если есть текст сообщения, но тип неизвестен — считаем out_of_scope, чтобы фронт точно показал ответ
if msg and msg . strip ( ) and raw_type not in (
" documents_list_ready " , " document_uploaded " , " document_ocr_completed " ,
" ocr_status " , " claim_ready " , " claim_plan_ready " , " claim_plan_error " ,
) :
return {
" event_type " : " out_of_scope " ,
" message " : msg . strip ( ) ,
" data " : actual_event . get ( " data " , { } ) ,
" suggested_actions " : actual_event . get ( " suggested_actions " ) ,
}
# Остальные события — прозрачно, только дополняем message
out = dict ( actual_event )
if " message " not in out or not out . get ( " message " ) :
out [ " message " ] = msg
return out
2025-10-27 08:33:16 +03:00
class EventPublish ( BaseModel ) :
""" Модель для публикации события """
event_type : str = " ocr_completed "
status : str
message : str
data : Dict [ str , Any ] = { }
timestamp : str = None
@router.post ( " /events/ {task_id} " )
async def publish_event ( task_id : str , event : EventPublish ) :
"""
Публикация события в Redis канал
2025-11-20 18:31:42 +03:00
Используется n8n для отправки событий ( OCR , AI , wizard и т . д . )
2025-10-27 08:33:16 +03:00
Args :
2025-11-20 18:31:42 +03:00
task_id : Session token ( например , sess - 1763201209156 - hyjye5u9h )
Используется для формирования канала ocr_events : { session_token }
2025-10-27 08:33:16 +03:00
event : Данные события
Returns :
Статус публикации
"""
try :
2025-11-20 18:31:42 +03:00
# task_id на самом деле это session_token
2025-10-27 08:33:16 +03:00
channel = f " ocr_events: { task_id } "
event_data = {
" event_type " : event . event_type ,
" status " : event . status ,
" message " : event . message ,
" data " : event . data ,
" timestamp " : event . timestamp
}
# Публикуем в Redis
event_json = json . dumps ( event_data , ensure_ascii = False )
await redis_service . publish ( channel , event_json )
logger . info ( f " 📢 Event published to { channel } : { event . status } " )
return {
" success " : True ,
" channel " : channel ,
" event " : event_data
}
except Exception as e :
logger . error ( f " ❌ Failed to publish event: { e } " )
return {
" success " : False ,
" error " : str ( e )
}
@router.get ( " /events/ {task_id} " )
async def stream_events ( task_id : str ) :
"""
2025-11-20 18:31:42 +03:00
SSE стрим событий обработки OCR , AI , wizard и т . д .
2025-10-27 08:33:16 +03:00
Args :
2025-11-20 18:31:42 +03:00
task_id : Session token ( например , sess - 1763201209156 - hyjye5u9h )
Используется для формирования канала ocr_events : { session_token }
Фронтенд подключается через EventSource к этому эндпоинту
2025-10-27 08:33:16 +03:00
Returns :
StreamingResponse с событиями
"""
2026-02-24 16:17:59 +03:00
logger . info (
" 🚀 SSE connection requested for session_token: %s → channel=ocr_events: %s (Redis %s : %s ) " ,
task_id , task_id , settings . redis_host , settings . redis_port ,
)
2025-10-27 08:33:16 +03:00
async def event_generator ( ) :
""" Генератор событий из Redis Pub/Sub """
2025-11-20 18:31:42 +03:00
# task_id на самом деле это session_token
2025-10-27 08:33:16 +03:00
channel = f " ocr_events: { task_id } "
# Подписываемся на канал Redis
2025-10-27 19:37:41 +03:00
pubsub = redis_service . client . pubsub ( )
2025-10-27 08:33:16 +03:00
await pubsub . subscribe ( channel )
2026-02-24 16:17:59 +03:00
logger . info (
" 📡 Subscribed to channel= %s on Redis %s : %s (проверка: redis-cli -h %s PUBSUB NUMSUB %s ) " ,
channel , settings . redis_host , settings . redis_port , settings . redis_host , channel ,
)
2025-10-27 08:33:16 +03:00
# Отправляем начальное событие
yield f " data: { json . dumps ( { ' status ' : ' connected ' , ' message ' : ' Подключено к событиям ' } ) } \n \n "
try :
# Слушаем события
while True :
2025-10-27 19:37:41 +03:00
logger . info ( f " ⏳ Waiting for message on { channel } ... " )
2025-11-19 18:46:48 +03:00
message = await pubsub . get_message ( ignore_subscribe_messages = True , timeout = 60.0 ) # Увеличено для RAG обработки
2025-10-27 08:33:16 +03:00
2025-10-27 19:37:41 +03:00
if message :
logger . info ( f " 📥 Received message type: { message [ ' type ' ] } " )
if message [ ' type ' ] == ' message ' :
event_data = message [ ' data ' ] # Уже строка (decode_responses=True)
logger . info ( f " 📦 Raw event data: { event_data [ : 200 ] } ... " )
event = json . loads ( event_data )
# Обработка формата от n8n Redis ноды (вложенный)
# Формат: {"claim_id": "...", "event": {...}}
if ' event ' in event and isinstance ( event [ ' event ' ] , dict ) :
# Извлекаем вложенное событие
actual_event = event [ ' event ' ]
logger . info ( f " 📦 Unwrapped n8n Redis format for { task_id } " )
else :
# Формат уже плоский (от backend API или старых источников)
actual_event = event
2025-11-26 19:54:51 +03:00
# ✅ Логируем полученное событие
event_type = actual_event . get ( ' event_type ' )
logger . info ( f " 🔍 Processing event: event_type= { event_type } , has claim_id= { bool ( actual_event . get ( ' claim_id ' ) ) } " )
# ✅ Обработка нового формата: documents_list_ready
if event_type == ' documents_list_ready ' :
logger . info ( f " 📋 Documents list received: { len ( actual_event . get ( ' documents_required ' , [ ] ) ) } documents " )
# Просто пропускаем дальше к yield
2025-11-20 18:31:42 +03:00
# ✅ Обработка формата от n8n: если пришёл объект с claim_id, но без event_type
# Это значит, что n8n пушит минимальный payload для wizard_ready
2025-11-26 19:54:51 +03:00
elif not event_type and actual_event . get ( ' claim_id ' ) :
2025-11-20 18:31:42 +03:00
logger . info ( f " 📦 Detected minimal wizard payload (no event_type), wrapping for claim_id= { actual_event . get ( ' claim_id ' ) } " )
# Обёртываем в правильный формат
actual_event = {
' event_type ' : ' wizard_ready ' ,
' status ' : ' ready ' ,
' message ' : ' Wizard plan готов ' ,
' data ' : actual_event , # Весь объект становится data
' timestamp ' : actual_event . get ( ' timestamp ' ) or None
}
logger . info ( f " ✅ Wrapped minimal payload into wizard_ready event " )
# Обработка события wizard_ready: загружаем данные из PostgreSQL
if actual_event . get ( ' event_type ' ) == ' wizard_ready ' and actual_event . get ( ' data ' , { } ) . get ( ' claim_id ' ) :
claim_id = actual_event [ ' data ' ] [ ' claim_id ' ]
logger . info ( f " 🔍 Wizard ready event received, loading data for claim_id= { claim_id } " )
try :
# Загружаем данные из PostgreSQL
query = """
SELECT
id ,
payload - >> ' claim_id ' as claim_id ,
session_token ,
unified_id ,
status_code ,
channel ,
payload ,
created_at ,
updated_at
FROM clpr_claims
WHERE ( payload - >> ' claim_id ' = $ 1 OR id : : text = $ 1 )
LIMIT 1
"""
row = await db . fetch_one ( query , claim_id )
if row :
# Обрабатываем payload - может быть строкой (JSONB) или уже dict
payload_raw = row . get ( ' payload ' )
if isinstance ( payload_raw , str ) :
try :
payload = json . loads ( payload_raw ) if payload_raw else { }
except ( json . JSONDecodeError , TypeError ) :
payload = { }
elif isinstance ( payload_raw , dict ) :
payload = payload_raw
else :
payload = { }
# Извлекаем claim_id из payload, если е г о нет в row
claim_id_from_payload = payload . get ( ' claim_id ' ) if isinstance ( payload , dict ) else None
final_claim_id = row . get ( ' claim_id ' ) or claim_id_from_payload or str ( row [ ' id ' ] )
# Обогащаем событие полными данными из PostgreSQL
# Добавляем данные и в data, и в корень для совместимости с фронтендом
actual_event [ ' data ' ] = {
* * actual_event . get ( ' data ' , { } ) ,
' wizard_plan ' : payload . get ( ' wizard_plan ' ) ,
' problem_description ' : payload . get ( ' problem_description ' ) ,
' wizard_answers ' : payload . get ( ' answers ' ) ,
' answers_prefill ' : payload . get ( ' answers_prefill ' ) ,
' documents_meta ' : payload . get ( ' documents_meta ' , [ ] ) ,
' ai_agent1_facts ' : payload . get ( ' ai_agent1_facts ' ) ,
' ai_agent13_rag ' : payload . get ( ' ai_agent13_rag ' ) ,
' coverage_report ' : payload . get ( ' coverage_report ' ) ,
' phone ' : payload . get ( ' phone ' ) ,
' email ' : payload . get ( ' email ' ) ,
}
# Также добавляем wizard_plan в корень для совместимости с фронтендом
actual_event [ ' wizard_plan ' ] = payload . get ( ' wizard_plan ' )
actual_event [ ' answers_prefill ' ] = payload . get ( ' answers_prefill ' )
actual_event [ ' coverage_report ' ] = payload . get ( ' coverage_report ' )
logger . info ( f " ✅ Wizard data loaded from PostgreSQL for claim_id= { final_claim_id } , has_wizard_plan= { payload . get ( ' wizard_plan ' ) is not None } " )
else :
logger . warning ( f " ⚠️ Claim not found in PostgreSQL: claim_id= { claim_id } " )
except Exception as e :
logger . error ( f " ❌ Error loading wizard data from PostgreSQL: { e } " )
2025-12-04 12:22:23 +03:00
# ✅ Обработка ocr_status ready: загружаем form_draft из PostgreSQL
if actual_event . get ( ' event_type ' ) == ' ocr_status ' and actual_event . get ( ' status ' ) == ' ready ' :
claim_id = actual_event . get ( ' claim_id ' ) or actual_event . get ( ' data ' , { } ) . get ( ' claim_id ' )
# ✅ Получаем cf_2624 из события (Данные подтверждены)
cf_2624 = actual_event . get ( ' cf_2624 ' )
if claim_id :
logger . info ( f " 🔍 OCR ready event received, loading form_draft for claim_id= { claim_id } , cf_2624= { cf_2624 } " )
try :
# ✅ Если есть cf_2624 в событии - сохраняем в черновик
if cf_2624 is not None :
try :
update_query = """
UPDATE clpr_claims
SET payload = jsonb_set (
COALESCE ( payload , ' {} ' : : jsonb ) ,
' {cf_2624} ' ,
$ 1 : : jsonb
)
WHERE id : : text = $ 2 OR payload - >> ' claim_id ' = $ 2
RETURNING id ;
"""
await db . execute ( update_query , json . dumps ( cf_2624 ) , claim_id )
logger . info ( f " ✅ Сохранён cf_2624= { cf_2624 } в черновик claim_id= { claim_id } " )
except Exception as e :
logger . warning ( f " ⚠️ Н е удалось сохранить cf_2624 в черновик: { e } " )
# Загружаем form_draft и documents из PostgreSQL
query = """
SELECT
c . id ,
c . payload - > ' form_draft ' as form_draft ,
c . payload - > ' documents_required ' as documents_required ,
c . payload - > ' documents_meta ' as documents_meta ,
c . payload - >> ' cf_2624 ' as cf_2624
FROM clpr_claims c
WHERE c . id : : text = $ 1 OR c . payload - >> ' claim_id ' = $ 1
LIMIT 1
"""
row = await db . fetch_one ( query , claim_id )
if row :
# Парсим JSONB поля (могут быть строками)
form_draft_raw = row . get ( ' form_draft ' )
documents_required_raw = row . get ( ' documents_required ' )
documents_meta_raw = row . get ( ' documents_meta ' )
cf_2624_from_db = row . get ( ' cf_2624 ' ) # ✅ Получаем cf_2624 из БД
# Парсим если строка
def parse_json_field ( val ) :
if val is None :
return None
if isinstance ( val , str ) :
try :
return json . loads ( val )
except :
return val
return val
form_draft = parse_json_field ( form_draft_raw )
documents_required = parse_json_field ( documents_required_raw )
documents_meta = parse_json_field ( documents_meta_raw )
# Обогащаем событие данными из БД
actual_event [ ' data ' ] = {
' claim_id ' : claim_id ,
' all_ready ' : True ,
' form_draft ' : form_draft ,
' documents_required ' : documents_required ,
' documents_meta ' : documents_meta ,
}
# ✅ Добавляем cf_2624 в событие (из БД или из события)
actual_event [ ' cf_2624 ' ] = cf_2624_from_db or cf_2624 or " 0 "
logger . info ( f " ✅ Form draft loaded from PostgreSQL for claim_id= { claim_id } , has_form_draft= { form_draft is not None } , cf_2624= { actual_event . get ( ' cf_2624 ' ) } " )
else :
logger . warning ( f " ⚠️ Claim not found in PostgreSQL: claim_id= { claim_id } " )
except Exception as e :
logger . error ( f " ❌ Error loading form_draft from PostgreSQL: { e } " )
2026-02-24 16:17:59 +03:00
# Единый формат для фронта: событие с полями event_type и message (и data при необходимости)
raw_event_type = actual_event . get ( " event_type " )
raw_status = actual_event . get ( " status " )
actual_event = _normalize_display_event ( actual_event )
2025-10-27 19:37:41 +03:00
# Отправляем событие клиенту (плоский формат)
2025-12-04 12:22:23 +03:00
event_json = json . dumps ( actual_event , ensure_ascii = False , default = str )
2026-02-24 16:17:59 +03:00
event_type_sent = actual_event . get ( " event_type " , " unknown " )
event_status = actual_event . get ( " status " ) or ( actual_event . get ( " data " ) or { } ) . get ( " status " ) or " unknown "
2025-12-04 12:22:23 +03:00
# Логируем размер и наличие данных
data_info = actual_event . get ( ' data ' , { } )
has_form_draft = ' form_draft ' in data_info if isinstance ( data_info , dict ) else False
logger . info ( f " 📤 Sending event to client: type= { event_type_sent } , status= { event_status } , json_len= { len ( event_json ) } , has_form_draft= { has_form_draft } " )
2025-10-27 19:37:41 +03:00
yield f " data: { event_json } \n \n "
# Если обработка завершена - закрываем соединение
2025-11-26 19:54:51 +03:00
# Н Е закрываем для documents_list_ready и document_ocr_completed (ждём ещё события)
2026-02-24 16:17:59 +03:00
if event_status in [ ' completed ' , ' error ' ] and ( raw_event_type or event_type_sent ) not in [ ' documents_list_ready ' , ' document_ocr_completed ' , ' document_uploaded ' ] :
2025-10-27 19:37:41 +03:00
logger . info ( f " ✅ Task { task_id } finished, closing SSE " )
break
2025-11-26 19:54:51 +03:00
2026-02-24 16:17:59 +03:00
# Закрываем для финальных событий (raw_event_type до нормализации)
if raw_event_type in [ ' claim_ready ' , ' claim_plan_ready ' , ' wizard_ready ' , ' wizard_plan_ready ' ] :
logger . info ( f " ✅ Final event { raw_event_type } sent, closing SSE " )
break
2025-11-26 19:54:51 +03:00
if event_type_sent in [ ' claim_ready ' , ' claim_plan_ready ' ] :
logger . info ( f " ✅ Final event { event_type_sent } sent, closing SSE " )
break
2025-12-04 12:22:23 +03:00
# Закрываем для ocr_status ready (форма заявления готова)
2026-02-24 16:17:59 +03:00
if raw_event_type == " ocr_status " and raw_status == " ready " :
logger . info ( " ✅ OCR ready event sent, closing SSE " )
2025-12-04 12:22:23 +03:00
break
2025-10-27 19:37:41 +03:00
else :
logger . info ( f " ⏰ Timeout waiting for message on { channel } " )
2025-10-27 08:33:16 +03:00
# Пинг каждые 30 сек чтобы соединение не закрылось
await asyncio . sleep ( 0.1 )
except asyncio . CancelledError :
logger . info ( f " ❌ Client disconnected from { channel } " )
finally :
await pubsub . unsubscribe ( channel )
await pubsub . close ( )
return StreamingResponse (
event_generator ( ) ,
media_type = " text/event-stream " ,
headers = {
" Cache-Control " : " no-cache " ,
" Connection " : " keep-alive " ,
" X-Accel-Buffering " : " no " # Отключаем буферизацию nginx
}
)
2025-11-24 13:36:14 +03:00
@router.get ( " /claim-plan/ {session_token} " )
async def stream_claim_plan ( session_token : str ) :
"""
SSE стрим для получения данных заявления из канала claim : plan : { session_token }
Используется после отправки формы визарда для получения данных заявления
от n8n workflow , которые затем отображаются в форме подтверждения .
Args :
session_token : Session token ( например , sess_c9e7c0c2 - de2e - 40 cd - ab7c - 3 bdc40282d34 )
Используется для формирования канала claim : plan : { session_token }
Returns :
StreamingResponse с данными заявления в формате :
{
" event_type " : " claim_plan_ready " ,
" status " : " ready " ,
" data " : {
" propertyName " : { . . . } , / / Данные заявления из n8n
. . .
}
}
"""
2026-02-24 16:17:59 +03:00
logger . info (
" 🚀 Claim plan SSE: session_token= %s → channel=claim:plan: %s (Redis %s : %s ) " ,
session_token , session_token , settings . redis_host , settings . redis_port ,
)
2025-11-24 13:36:14 +03:00
async def claim_plan_generator ( ) :
""" Генератор событий из Redis Pub/Sub для claim:plan канала """
channel = f " claim:plan: { session_token } "
# Подписываемся на канал Redis
pubsub = redis_service . client . pubsub ( )
await pubsub . subscribe ( channel )
2026-02-24 16:17:59 +03:00
logger . info (
" 📡 Subscribed to channel= %s on Redis %s : %s (PUBSUB NUMSUB %s ) " ,
channel , settings . redis_host , settings . redis_port , channel ,
)
2025-11-24 13:36:14 +03:00
# Отправляем начальное событие
yield f " data: { json . dumps ( { ' status ' : ' connected ' , ' message ' : ' Ожидание данных заявления... ' } ) } \n \n "
try :
# Слушаем события (таймаут 5 минут для обработки в n8n)
while True :
logger . info ( f " ⏳ Waiting for claim plan data on { channel } ... " )
message = await pubsub . get_message ( ignore_subscribe_messages = True , timeout = 300.0 )
if message :
logger . info ( f " 📥 Received claim plan message type: { message [ ' type ' ] } " )
if message [ ' type ' ] == ' message ' :
event_data_raw = message [ ' data ' ] # Уже строка (decode_responses=True)
logger . info ( f " 📦 Raw claim plan data length: { len ( event_data_raw ) } " )
try :
# Парсим данные от n8n
claim_data = json . loads ( event_data_raw )
# Формируем событие в стандартном формате
event = {
" event_type " : " claim_plan_ready " ,
" status " : " ready " ,
" message " : " Данные заявления готовы " ,
" data " : claim_data , # Весь объект от n8n
" timestamp " : None
}
logger . info ( f " ✅ Claim plan data received for session { session_token } " )
# Отправляем событие клиенту
event_json = json . dumps ( event , ensure_ascii = False )
yield f " data: { event_json } \n \n "
# После получения данных закрываем соединение
logger . info ( f " ✅ Claim plan sent to client, closing SSE " )
break
except json . JSONDecodeError as e :
logger . error ( f " ❌ Failed to parse claim plan JSON: { e } " )
error_event = {
" event_type " : " claim_plan_error " ,
" status " : " error " ,
" message " : f " Ошибка парсинга данных: { str ( e ) } " ,
" data " : { } ,
" timestamp " : None
}
yield f " data: { json . dumps ( error_event , ensure_ascii = False ) } \n \n "
break
else :
logger . info ( f " ⏰ Timeout waiting for claim plan on { channel } " )
# Отправляем timeout событие
timeout_event = {
" event_type " : " claim_plan_timeout " ,
" status " : " timeout " ,
" message " : " Превышено время ожидания данных заявления " ,
" data " : { } ,
" timestamp " : None
}
yield f " data: { json . dumps ( timeout_event , ensure_ascii = False ) } \n \n "
break
await asyncio . sleep ( 0.1 )
except asyncio . CancelledError :
logger . info ( f " ❌ Client disconnected from { channel } " )
except Exception as e :
logger . error ( f " ❌ Error in claim plan stream: { e } " )
error_event = {
" event_type " : " claim_plan_error " ,
" status " : " error " ,
" message " : f " Ошибка получения данных: { str ( e ) } " ,
" data " : { } ,
" timestamp " : None
}
yield f " data: { json . dumps ( error_event , ensure_ascii = False ) } \n \n "
finally :
await pubsub . unsubscribe ( channel )
await pubsub . close ( )
return StreamingResponse (
claim_plan_generator ( ) ,
media_type = " text/event-stream " ,
headers = {
" Cache-Control " : " no-cache " ,
" Connection " : " keep-alive " ,
" X-Accel-Buffering " : " no " # Отключаем буферизацию nginx
}
)