Добавлен field_label в результат переименования файлов, исправлена загрузка черновиков, обновлен формат пути S3 с project_name

This commit is contained in:
Fedor
2025-11-22 09:38:38 +03:00
parent d3ba054027
commit 486f3619ff
212 changed files with 6704 additions and 123 deletions

View File

@@ -460,6 +460,85 @@ async def get_claim(claim_id: str):
}
@router.get("/wizard/load/{claim_id}")
async def load_wizard_data(claim_id: str):
"""
Загрузить данные визарда из PostgreSQL по claim_id
Используется после получения claim_id из ocr_events.
Возвращает полные данные для построения формы (wizard_plan, problem_description и т.д.)
"""
try:
logger.info(f"🔍 Загрузка данных визарда для claim_id={claim_id}")
# Ищем заявку по claim_id (может быть UUID или строка CLM-...)
query = """
SELECT
id,
payload->>'claim_id' as claim_id,
session_token,
unified_id,
status_code,
channel,
payload,
created_at,
updated_at
FROM clpr_claims
WHERE (payload->>'claim_id' = $1 OR id::text = $1)
LIMIT 1
"""
row = await db.fetch_one(query, claim_id)
if not row:
raise HTTPException(status_code=404, detail=f"Заявка не найдена: {claim_id}")
# Обрабатываем payload - может быть строкой (JSONB) или уже dict
payload_raw = row.get('payload')
if isinstance(payload_raw, str):
try:
payload = json.loads(payload_raw) if payload_raw else {}
except (json.JSONDecodeError, TypeError):
payload = {}
elif isinstance(payload_raw, dict):
payload = payload_raw
else:
payload = {}
# Извлекаем claim_id из payload, если его нет в row
claim_id_from_payload = payload.get('claim_id') if isinstance(payload, dict) else None
final_claim_id = row.get('claim_id') or claim_id_from_payload or str(row['id'])
logger.info(f"✅ Загружены данные визарда: claim_id={final_claim_id}, has_wizard_plan={payload.get('wizard_plan') is not None}")
return {
"success": True,
"claim_id": final_claim_id,
"session_token": row.get('session_token'),
"unified_id": row.get('unified_id'),
"status_code": row.get('status_code'),
"channel": row.get('channel'),
"wizard_plan": payload.get('wizard_plan'),
"problem_description": payload.get('problem_description'),
"wizard_answers": payload.get('answers'),
"answers_prefill": payload.get('answers_prefill'),
"documents_meta": payload.get('documents_meta', []),
"ai_agent1_facts": payload.get('ai_agent1_facts'),
"ai_agent13_rag": payload.get('ai_agent13_rag'),
"coverage_report": payload.get('coverage_report'),
"phone": payload.get('phone'),
"email": payload.get('email'),
"created_at": row['created_at'].isoformat() if row.get('created_at') else None,
"updated_at": row['updated_at'].isoformat() if row.get('updated_at') else None,
}
except HTTPException:
raise
except Exception as e:
logger.exception("❌ Ошибка при загрузке данных визарда")
raise HTTPException(status_code=500, detail=f"Ошибка при загрузке данных визарда: {str(e)}")
@router.post("/description")
async def publish_ticket_form_description(payload: TicketFormDescriptionRequest):
"""

View File

@@ -8,6 +8,7 @@ from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Dict, Any
from app.services.redis_service import redis_service
from app.services.database import db
import logging
logger = logging.getLogger(__name__)
@@ -29,16 +30,18 @@ async def publish_event(task_id: str, event: EventPublish):
"""
Публикация события в Redis канал
Используется n8n для отправки событий (OCR, AI и т.д.)
Используется n8n для отправки событий (OCR, AI, wizard и т.д.)
Args:
task_id: ID задачи
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
Используется для формирования канала ocr_events:{session_token}
event: Данные события
Returns:
Статус публикации
"""
try:
# task_id на самом деле это session_token
channel = f"ocr_events:{task_id}"
event_data = {
"event_type": event.event_type,
@@ -71,18 +74,21 @@ async def publish_event(task_id: str, event: EventPublish):
@router.get("/events/{task_id}")
async def stream_events(task_id: str):
"""
SSE стрим событий обработки OCR
SSE стрим событий обработки OCR, AI, wizard и т.д.
Args:
task_id: ID задачи
task_id: Session token (например, sess-1763201209156-hyjye5u9h)
Используется для формирования канала ocr_events:{session_token}
Фронтенд подключается через EventSource к этому эндпоинту
Returns:
StreamingResponse с событиями
"""
logger.info(f"🚀 SSE connection requested for task_id: {task_id}")
logger.info(f"🚀 SSE connection requested for session_token: {task_id}")
async def event_generator():
"""Генератор событий из Redis Pub/Sub"""
# task_id на самом деле это session_token
channel = f"ocr_events:{task_id}"
# Подписываемся на канал Redis
@@ -117,6 +123,90 @@ async def stream_events(task_id: str):
# Формат уже плоский (от backend API или старых источников)
actual_event = event
# ✅ Обработка формата от n8n: если пришёл объект с claim_id, но без event_type
# Это значит, что n8n пушит минимальный payload для wizard_ready
logger.info(f"🔍 Checking event: has event_type={bool(actual_event.get('event_type'))}, has claim_id={bool(actual_event.get('claim_id'))}")
if not actual_event.get('event_type') and actual_event.get('claim_id'):
logger.info(f"📦 Detected minimal wizard payload (no event_type), wrapping for claim_id={actual_event.get('claim_id')}")
# Обёртываем в правильный формат
actual_event = {
'event_type': 'wizard_ready',
'status': 'ready',
'message': 'Wizard plan готов',
'data': actual_event, # Весь объект становится data
'timestamp': actual_event.get('timestamp') or None
}
logger.info(f"✅ Wrapped minimal payload into wizard_ready event")
# Обработка события wizard_ready: загружаем данные из PostgreSQL
if actual_event.get('event_type') == 'wizard_ready' and actual_event.get('data', {}).get('claim_id'):
claim_id = actual_event['data']['claim_id']
logger.info(f"🔍 Wizard ready event received, loading data for claim_id={claim_id}")
try:
# Загружаем данные из PostgreSQL
query = """
SELECT
id,
payload->>'claim_id' as claim_id,
session_token,
unified_id,
status_code,
channel,
payload,
created_at,
updated_at
FROM clpr_claims
WHERE (payload->>'claim_id' = $1 OR id::text = $1)
LIMIT 1
"""
row = await db.fetch_one(query, claim_id)
if row:
# Обрабатываем payload - может быть строкой (JSONB) или уже dict
payload_raw = row.get('payload')
if isinstance(payload_raw, str):
try:
payload = json.loads(payload_raw) if payload_raw else {}
except (json.JSONDecodeError, TypeError):
payload = {}
elif isinstance(payload_raw, dict):
payload = payload_raw
else:
payload = {}
# Извлекаем claim_id из payload, если его нет в row
claim_id_from_payload = payload.get('claim_id') if isinstance(payload, dict) else None
final_claim_id = row.get('claim_id') or claim_id_from_payload or str(row['id'])
# Обогащаем событие полными данными из PostgreSQL
# Добавляем данные и в data, и в корень для совместимости с фронтендом
actual_event['data'] = {
**actual_event.get('data', {}),
'wizard_plan': payload.get('wizard_plan'),
'problem_description': payload.get('problem_description'),
'wizard_answers': payload.get('answers'),
'answers_prefill': payload.get('answers_prefill'),
'documents_meta': payload.get('documents_meta', []),
'ai_agent1_facts': payload.get('ai_agent1_facts'),
'ai_agent13_rag': payload.get('ai_agent13_rag'),
'coverage_report': payload.get('coverage_report'),
'phone': payload.get('phone'),
'email': payload.get('email'),
}
# Также добавляем wizard_plan в корень для совместимости с фронтендом
actual_event['wizard_plan'] = payload.get('wizard_plan')
actual_event['answers_prefill'] = payload.get('answers_prefill')
actual_event['coverage_report'] = payload.get('coverage_report')
logger.info(f"✅ Wizard data loaded from PostgreSQL for claim_id={final_claim_id}, has_wizard_plan={payload.get('wizard_plan') is not None}")
else:
logger.warning(f"⚠️ Claim not found in PostgreSQL: claim_id={claim_id}")
except Exception as e:
logger.error(f"❌ Error loading wizard data from PostgreSQL: {e}")
# Отправляем событие клиенту (плоский формат)
event_json = json.dumps(actual_event, ensure_ascii=False)
logger.info(f"📤 Sending event to client: {actual_event.get('status', 'unknown')}")

View File

@@ -0,0 +1,193 @@
"""
Session management API endpoints
Обеспечивает управление сессиями пользователей через Redis:
- Верификация существующей сессии
- Logout (удаление сессии)
"""
import json
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import redis.asyncio as redis
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/session", tags=["session"])
# Redis connection (используем существующее подключение)
redis_client: Optional[redis.Redis] = None
def init_redis(redis_conn: redis.Redis):
"""Initialize Redis connection"""
global redis_client
redis_client = redis_conn
class SessionVerifyRequest(BaseModel):
session_token: str
class SessionVerifyResponse(BaseModel):
success: bool
valid: bool
unified_id: Optional[str] = None
phone: Optional[str] = None
contact_id: Optional[str] = None
verified_at: Optional[str] = None
expires_in_seconds: Optional[int] = None
class SessionLogoutRequest(BaseModel):
session_token: str
class SessionLogoutResponse(BaseModel):
success: bool
message: str
@router.post("/verify", response_model=SessionVerifyResponse)
async def verify_session(request: SessionVerifyRequest):
"""
Проверить валидность сессии по session_token
Используется при загрузке страницы, чтобы восстановить сессию пользователя.
Если сессия валидна - возвращаем unified_id, phone и другие данные.
"""
try:
if not redis_client:
raise HTTPException(status_code=500, detail="Redis connection not initialized")
session_key = f"session:{request.session_token}"
logger.info(f"🔍 Проверка сессии: {session_key}")
# Получаем данные сессии из Redis
session_data_raw = await redis_client.get(session_key)
if not session_data_raw:
logger.info(f"❌ Сессия не найдена или истекла: {session_key}")
return SessionVerifyResponse(
success=True,
valid=False
)
# Парсим данные сессии
session_data = json.loads(session_data_raw)
# Получаем TTL (оставшееся время жизни)
ttl = await redis_client.ttl(session_key)
logger.info(f"✅ Сессия валидна: unified_id={session_data.get('unified_id')}, TTL={ttl}s")
return SessionVerifyResponse(
success=True,
valid=True,
unified_id=session_data.get('unified_id'),
phone=session_data.get('phone'),
contact_id=session_data.get('contact_id'),
verified_at=session_data.get('verified_at'),
expires_in_seconds=ttl if ttl > 0 else None
)
except json.JSONDecodeError as e:
logger.error(f"❌ Ошибка парсинга данных сессии: {e}")
return SessionVerifyResponse(
success=True,
valid=False
)
except Exception as e:
logger.exception("❌ Ошибка проверки сессии")
raise HTTPException(status_code=500, detail=f"Ошибка проверки сессии: {str(e)}")
@router.post("/logout", response_model=SessionLogoutResponse)
async def logout_session(request: SessionLogoutRequest):
"""
Выход из сессии (удаление session_token из Redis)
Используется при клике на кнопку "Выход".
"""
try:
if not redis_client:
raise HTTPException(status_code=500, detail="Redis connection not initialized")
session_key = f"session:{request.session_token}"
logger.info(f"🚪 Выход из сессии: {session_key}")
# Удаляем сессию из Redis
deleted = await redis_client.delete(session_key)
if deleted > 0:
logger.info(f"✅ Сессия удалена: {session_key}")
return SessionLogoutResponse(
success=True,
message="Выход выполнен успешно"
)
else:
logger.info(f"⚠️ Сессия не найдена (возможно, уже удалена): {session_key}")
return SessionLogoutResponse(
success=True,
message="Сессия уже завершена"
)
except Exception as e:
logger.exception("❌ Ошибка при выходе из сессии")
raise HTTPException(status_code=500, detail=f"Ошибка при выходе: {str(e)}")
class SessionCreateRequest(BaseModel):
session_token: str
unified_id: str
phone: str
contact_id: str
ttl_hours: int = 24
@router.post("/create")
async def create_session(request: SessionCreateRequest):
"""
Создать новую сессию (вызывается после успешной SMS верификации)
Обычно вызывается из Step1Phone после получения данных от n8n.
"""
try:
if not redis_client:
raise HTTPException(status_code=500, detail="Redis connection not initialized")
session_key = f"session:{request.session_token}"
session_data = {
'unified_id': request.unified_id,
'phone': request.phone,
'contact_id': request.contact_id,
'verified_at': datetime.utcnow().isoformat(),
'expires_at': (datetime.utcnow() + timedelta(hours=request.ttl_hours)).isoformat()
}
# Сохраняем в Redis с TTL
await redis_client.setex(
session_key,
request.ttl_hours * 3600, # TTL в секундах
json.dumps(session_data)
)
logger.info(f"✅ Сессия создана: {session_key}, unified_id={request.unified_id}, TTL={request.ttl_hours}h")
return {
'success': True,
'session_token': request.session_token,
'expires_in_seconds': request.ttl_hours * 3600
}
except Exception as e:
logger.exception("❌ Ошибка создания сессии")
raise HTTPException(status_code=500, detail=f"Ошибка создания сессии: {str(e)}")