Files
aiform_dev/backend/app/services/n8n_service.py

180 lines
6.8 KiB
Python
Raw Normal View History

"""
Сервис для работы с n8n API
"""
import httpx
import logging
from typing import Optional
from ..config import settings
from ..services.redis_service import redis_service
logger = logging.getLogger(__name__)
# Workflow ID для ticket_form:description
WORKFLOW_ID = "b4K4u851b4JFivyD"
N8N_URL = "https://n8n.clientright.pro"
MIN_RESTART_INTERVAL = 300 # Минимум 5 минут между перезапусками
async def check_workflow_status() -> Optional[dict]:
"""
Проверка статуса workflow через n8n API
Returns:
dict с данными workflow или None при ошибке
"""
if not settings.n8n_api_key:
logger.warning("⚠️ N8N_API_KEY не настроен")
return None
headers = _get_headers()
if not headers:
return None
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}",
headers=headers
)
if response.status_code == 200:
return response.json()
else:
logger.warning(f"⚠️ n8n API вернул статус {response.status_code}")
return None
except Exception as e:
logger.error(f"❌ Ошибка при проверке статуса workflow: {e}")
return None
async def restart_workflow() -> bool:
"""
Перезапуск workflow через n8n API
Returns:
True если успешно, False при ошибке
"""
if not settings.n8n_api_key:
logger.error("❌ N8N_API_KEY не настроен! Не могу перезапустить workflow")
return False
headers = _get_headers()
if not headers:
return False
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Шаг 1: Деактивировать workflow
logger.info(f"🔄 Деактивирую workflow {WORKFLOW_ID}...")
deactivate_response = await client.post(
f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}/deactivate",
headers=headers
)
if deactivate_response.status_code not in [200, 404]:
logger.warning(
f"⚠️ Неожиданный статус при деактивации: "
f"{deactivate_response.status_code}"
)
else:
logger.info("✅ Workflow деактивирован")
# Задержка перед активацией
import asyncio
await asyncio.sleep(2)
# Шаг 2: Активировать workflow
logger.info(f"🔄 Активирую workflow {WORKFLOW_ID}...")
activate_response = await client.post(
f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}/activate",
headers=headers
)
if activate_response.status_code == 200:
logger.info("✅ Workflow активирован")
# После успешного перезапуска отправляем сообщения из буфера
await _send_buffered_messages()
return True
else:
logger.error(
f"❌ Ошибка активации workflow: "
f"{activate_response.status_code} - {activate_response.text[:200]}"
)
return False
except Exception as e:
logger.error(f"❌ Неожиданная ошибка при перезапуске workflow: {e}")
return False
async def _send_buffered_messages():
"""
Отправить все сообщения из буфера после восстановления workflow
"""
try:
buffer_key = "description" # Буфер для ticket_form:description
messages = await redis_service.buffer_get_all(buffer_key)
if not messages:
logger.info("📭 Буфер пуст, нечего отправлять")
return
logger.info(f"📤 Отправляю {len(messages)} сообщений из буфера...")
import json
channel = f"{settings.redis_prefix}description"
sent_count = 0
failed_count = 0
for message in messages:
try:
event_json = json.dumps(message.get("event", message), ensure_ascii=False)
subscribers = await redis_service.publish(channel, event_json)
if subscribers > 0:
sent_count += 1
logger.info(
f"✅ Буферированное сообщение отправлено: "
f"session_id={message.get('session_id', 'unknown')}, "
f"subscribers={subscribers}"
)
else:
failed_count += 1
logger.warning(
f"⚠️ Буферированное сообщение не доставлено "
f"(подписчиков нет): session_id={message.get('session_id', 'unknown')}"
)
# Возвращаем обратно в буфер если не доставлено
await redis_service.buffer_push(buffer_key, message)
except Exception as e:
failed_count += 1
logger.error(f"❌ Ошибка отправки буферизованного сообщения: {e}")
# Возвращаем обратно в буфер
await redis_service.buffer_push(buffer_key, message)
logger.info(
f"📊 Результат отправки буфера: {sent_count} отправлено, {failed_count} не доставлено"
)
except Exception as e:
logger.exception(f"❌ Ошибка при отправке буферизованных сообщений: {e}")
def _get_headers() -> Optional[dict]:
"""Получить заголовки для n8n API"""
if not settings.n8n_api_key:
return None
api_key = settings.n8n_api_key
# Убираем "Bearer " если есть - n8n API использует X-N8N-API-KEY
clean_key = api_key.replace("Bearer ", "").strip()
# n8n API принимает ключ в заголовке X-N8N-API-KEY
return {"X-N8N-API-KEY": clean_key}