Production fixes: n8n workflow auto-restart, user-friendly messages, fixed navigation buttons

This commit is contained in:
AI Assistant
2025-12-29 01:19:19 +03:00
parent 080e7ec105
commit 30774db18c
16 changed files with 539 additions and 353 deletions

View File

@@ -1,7 +1,7 @@
"""
Claims API Routes - Обработка заявок
"""
from fastapi import APIRouter, HTTPException, Request, Query
from fastapi import APIRouter, HTTPException, Request, Query, BackgroundTasks
from typing import Optional, List
import httpx
from .models import (
@@ -16,6 +16,7 @@ import logging
from ..services.redis_service import redis_service
from ..services.database import db
from ..services.crm_mysql_service import crm_mysql_service
from ..services.n8n_service import check_workflow_status, restart_workflow, MIN_RESTART_INTERVAL
from ..config import settings
router = APIRouter(prefix="/api/v1/claims", tags=["Claims"])
@@ -907,8 +908,55 @@ async def load_wizard_data(claim_id: str):
raise HTTPException(status_code=500, detail=f"Ошибка при загрузке данных визарда: {str(e)}")
async def _check_and_restart_workflow_if_needed(channel: str):
"""
Проверяет и перезапускает workflow если нужно (в фоне)
Защита от частых перезапусков через Redis lock
"""
try:
# Проверяем lock - если недавно перезапускали, пропускаем
lock_key = f"workflow_restart_lock:{channel}"
lock_value = await redis_service.get(lock_key)
if lock_value:
logger.info(f"⏸️ Workflow недавно перезапускался, пропускаем (lock active)")
return
# Проверяем статус workflow
workflow_data = await check_workflow_status()
if workflow_data:
is_active = workflow_data.get("active", False)
if not is_active:
logger.warning(f"⚠️ Workflow НЕ активен! Активирую и перезапускаю...")
# Workflow выключен — нужно его ВКЛЮЧИТЬ
else:
logger.info(
f"⚠️ Workflow активен, но нет подписчиков. Перезапускаю workflow..."
)
# Устанавливаем lock на MIN_RESTART_INTERVAL секунд
await redis_service.set(lock_key, "1", expire=MIN_RESTART_INTERVAL)
# Перезапускаем
success = await restart_workflow()
if success:
logger.info("✅ Workflow успешно перезапущен")
else:
logger.error("Не удалось перезапустить workflow")
else:
logger.warning("⚠️ Не удалось проверить статус workflow, пропускаем перезапуск")
except Exception as e:
logger.exception(f"❌ Ошибка при проверке/перезапуске workflow: {e}")
@router.post("/description")
async def publish_ticket_form_description(payload: TicketFormDescriptionRequest):
async def publish_ticket_form_description(
payload: TicketFormDescriptionRequest,
background_tasks: BackgroundTasks
):
"""
Публикует свободное описание проблемы в Redis канал ticket_form:description
(слушается воркфлоу в n8n)
@@ -969,8 +1017,21 @@ async def publish_ticket_form_description(payload: TicketFormDescriptionRequest)
logger.warning(
f"⚠️ WARNING: No subscribers on channel {channel}! "
f"n8n workflow is not listening to this channel. "
f"Event was published but will be lost."
f"Saving message to buffer and restarting workflow..."
)
# Сохраняем сообщение в буфер для последующей отправки
buffer_message = {
"session_id": payload.session_id,
"claim_id": payload.claim_id,
"event": event,
"timestamp": datetime.utcnow().isoformat(),
}
await redis_service.buffer_push("description", buffer_message)
logger.info(f"💾 Сообщение сохранено в буфер: session_id={payload.session_id}")
# Запускаем проверку и перезапуск workflow в фоне
background_tasks.add_task(_check_and_restart_workflow_if_needed, channel)
# Дополнительная проверка: логируем полный event для отладки
logger.debug(
@@ -980,11 +1041,27 @@ async def publish_ticket_form_description(payload: TicketFormDescriptionRequest)
"event": event,
},
)
return {
# Формируем ответ с информацией о подписчиках
response_data = {
"success": True,
"channel": channel,
"subscribers_count": subscribers_count,
"event": event,
}
# Если подписчиков нет - сообщаем что обработка займёт больше времени
if subscribers_count == 0:
buffer_size = await redis_service.buffer_size("description")
response_data["warning"] = (
"Обработка вашего обращения займёт немного больше времени. "
"Идёт автоматическое восстановление системы. "
"Ваше сообщение сохранено и будет обработано в ближайшее время."
)
response_data["workflow_recovering"] = True
response_data["message_buffered"] = True
response_data["buffer_size"] = buffer_size
return response_data
except Exception as e:
logger.exception("❌ Failed to publish ticket form description")
raise HTTPException(

View File

@@ -171,8 +171,10 @@ class Settings(BaseSettings):
return self.cors_origins
# ============================================
# N8N WEBHOOKS (скрыты от фронтенда)
# N8N API & WEBHOOKS
# ============================================
n8n_url: str = "https://n8n.clientright.pro"
n8n_api_key: str = "" # Нужно задать в .env
n8n_policy_check_webhook: str = ""
n8n_file_upload_webhook: str = ""
n8n_create_contact_webhook: str = ""

View File

@@ -0,0 +1,179 @@
"""
Сервис для работы с n8n API
"""
import httpx
import logging
from typing import Optional
from ..config import settings
from ..services.redis_service import redis_service
logger = logging.getLogger(__name__)
# Workflow ID для ticket_form:description
WORKFLOW_ID = "b4K4u851b4JFivyD"
N8N_URL = "https://n8n.clientright.pro"
MIN_RESTART_INTERVAL = 300 # Минимум 5 минут между перезапусками
async def check_workflow_status() -> Optional[dict]:
"""
Проверка статуса workflow через n8n API
Returns:
dict с данными workflow или None при ошибке
"""
if not settings.n8n_api_key:
logger.warning("⚠️ N8N_API_KEY не настроен")
return None
headers = _get_headers()
if not headers:
return None
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}",
headers=headers
)
if response.status_code == 200:
return response.json()
else:
logger.warning(f"⚠️ n8n API вернул статус {response.status_code}")
return None
except Exception as e:
logger.error(f"❌ Ошибка при проверке статуса workflow: {e}")
return None
async def restart_workflow() -> bool:
"""
Перезапуск workflow через n8n API
Returns:
True если успешно, False при ошибке
"""
if not settings.n8n_api_key:
logger.error("❌ N8N_API_KEY не настроен! Не могу перезапустить workflow")
return False
headers = _get_headers()
if not headers:
return False
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Шаг 1: Деактивировать workflow
logger.info(f"🔄 Деактивирую workflow {WORKFLOW_ID}...")
deactivate_response = await client.post(
f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}/deactivate",
headers=headers
)
if deactivate_response.status_code not in [200, 404]:
logger.warning(
f"⚠️ Неожиданный статус при деактивации: "
f"{deactivate_response.status_code}"
)
else:
logger.info("✅ Workflow деактивирован")
# Задержка перед активацией
import asyncio
await asyncio.sleep(2)
# Шаг 2: Активировать workflow
logger.info(f"🔄 Активирую workflow {WORKFLOW_ID}...")
activate_response = await client.post(
f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}/activate",
headers=headers
)
if activate_response.status_code == 200:
logger.info("✅ Workflow активирован")
# После успешного перезапуска отправляем сообщения из буфера
await _send_buffered_messages()
return True
else:
logger.error(
f"❌ Ошибка активации workflow: "
f"{activate_response.status_code} - {activate_response.text[:200]}"
)
return False
except Exception as e:
logger.error(f"❌ Неожиданная ошибка при перезапуске workflow: {e}")
return False
async def _send_buffered_messages():
"""
Отправить все сообщения из буфера после восстановления workflow
"""
try:
buffer_key = "description" # Буфер для ticket_form:description
messages = await redis_service.buffer_get_all(buffer_key)
if not messages:
logger.info("📭 Буфер пуст, нечего отправлять")
return
logger.info(f"📤 Отправляю {len(messages)} сообщений из буфера...")
import json
channel = f"{settings.redis_prefix}description"
sent_count = 0
failed_count = 0
for message in messages:
try:
event_json = json.dumps(message.get("event", message), ensure_ascii=False)
subscribers = await redis_service.publish(channel, event_json)
if subscribers > 0:
sent_count += 1
logger.info(
f"✅ Буферированное сообщение отправлено: "
f"session_id={message.get('session_id', 'unknown')}, "
f"subscribers={subscribers}"
)
else:
failed_count += 1
logger.warning(
f"⚠️ Буферированное сообщение не доставлено "
f"(подписчиков нет): session_id={message.get('session_id', 'unknown')}"
)
# Возвращаем обратно в буфер если не доставлено
await redis_service.buffer_push(buffer_key, message)
except Exception as e:
failed_count += 1
logger.error(f"❌ Ошибка отправки буферизованного сообщения: {e}")
# Возвращаем обратно в буфер
await redis_service.buffer_push(buffer_key, message)
logger.info(
f"📊 Результат отправки буфера: {sent_count} отправлено, {failed_count} не доставлено"
)
except Exception as e:
logger.exception(f"❌ Ошибка при отправке буферизованных сообщений: {e}")
def _get_headers() -> Optional[dict]:
"""Получить заголовки для n8n API"""
if not settings.n8n_api_key:
return None
api_key = settings.n8n_api_key
# Убираем "Bearer " если есть - n8n API использует X-N8N-API-KEY
clean_key = api_key.replace("Bearer ", "").strip()
# n8n API принимает ключ в заголовке X-N8N-API-KEY
return {"X-N8N-API-KEY": clean_key}

View File

@@ -2,7 +2,7 @@
Redis Service для кеширования, rate limiting, сессий
"""
import redis.asyncio as redis
from typing import Optional, Any
from typing import Optional, Any, List
import json
from ..config import settings
import logging
@@ -155,6 +155,58 @@ class RedisService:
async def cache_delete(self, cache_key: str):
"""Удалить из кеша"""
await self.delete(f"cache:{cache_key}")
# ============================================
# MESSAGE BUFFER (для буферизации сообщений при недоступности workflow)
# ============================================
async def buffer_push(self, buffer_key: str, message: dict):
"""
Добавить сообщение в буфер (очередь)
Args:
buffer_key: Имя буфера (например, "description")
message: Сообщение для буферизации
"""
full_key = f"{settings.redis_prefix}buffer:{buffer_key}"
await self.client.lpush(full_key, json.dumps(message))
# Устанавливаем TTL на буфер (24 часа)
await self.client.expire(full_key, 86400)
async def buffer_get_all(self, buffer_key: str) -> List[dict]:
"""
Получить все сообщения из буфера (и очистить буфер)
Args:
buffer_key: Имя буфера
Returns:
Список сообщений
"""
full_key = f"{settings.redis_prefix}buffer:{buffer_key}"
# Используем транзакцию для атомарности
pipe = self.client.pipeline()
pipe.lrange(full_key, 0, -1) # Получить все
pipe.delete(full_key) # Удалить буфер
results = await pipe.execute()
messages_data = results[0] if results else []
messages = []
for msg_str in messages_data:
try:
messages.append(json.loads(msg_str))
except json.JSONDecodeError:
logger.warning(f"⚠️ Не удалось распарсить сообщение из буфера: {msg_str}")
# Возвращаем в правильном порядке (FIFO - сначала старые)
return list(reversed(messages))
async def buffer_size(self, buffer_key: str) -> int:
"""Получить размер буфера"""
full_key = f"{settings.redis_prefix}buffer:{buffer_key}"
return await self.client.llen(full_key)
# Глобальный экземпляр