""" Сервис для работы с n8n API """ import httpx import logging from typing import Optional from ..config import settings from ..services.redis_service import redis_service logger = logging.getLogger(__name__) # Workflow ID для ticket_form:description WORKFLOW_ID = "b4K4u851b4JFivyD" N8N_URL = "https://n8n.clientright.pro" MIN_RESTART_INTERVAL = 300 # Минимум 5 минут между перезапусками MAX_RETRY_ATTEMPTS = 2 # Максимум попыток перезапуска подряд async def check_workflow_status() -> Optional[dict]: """ Проверка статуса workflow через n8n API Returns: dict с данными workflow или None при ошибке """ if not settings.n8n_api_key: logger.warning("⚠️ N8N_API_KEY не настроен") return None headers = _get_headers() if not headers: return None try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get( f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}", headers=headers ) if response.status_code == 200: return response.json() else: logger.warning(f"⚠️ n8n API вернул статус {response.status_code}") return None except Exception as e: logger.error(f"❌ Ошибка при проверке статуса workflow: {e}") return None async def restart_workflow() -> bool: """ Перезапуск workflow через n8n API с улучшенной обработкой зависших состояний Returns: True если успешно, False при ошибке """ if not settings.n8n_api_key: logger.error("❌ N8N_API_KEY не настроен! Не могу перезапустить workflow") return False headers = _get_headers() if not headers: return False import asyncio try: # Увеличиваем таймаут для обработки зависших workflow async with httpx.AsyncClient(timeout=30.0) as client: # Шаг 1: Проверяем текущий статус logger.info(f"🔍 Проверяю текущий статус workflow {WORKFLOW_ID}...") status_response = await client.get( f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}", headers=headers ) if status_response.status_code == 200: workflow_data = status_response.json() is_active = workflow_data.get("active", False) logger.info(f"📊 Workflow активен: {is_active}") # Шаг 2: Деактивировать workflow (даже если уже неактивен - для сброса состояния) logger.info(f"🔄 Деактивирую workflow {WORKFLOW_ID}...") try: deactivate_response = await client.post( f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}/deactivate", headers=headers, timeout=15.0 # Отдельный таймаут для деактивации ) if deactivate_response.status_code in [200, 404]: logger.info("✅ Workflow деактивирован") else: logger.warning( f"⚠️ Неожиданный статус при деактивации: " f"{deactivate_response.status_code} - {deactivate_response.text[:200]}" ) # Продолжаем даже если деактивация не удалась - возможно workflow уже неактивен except httpx.TimeoutException: logger.warning("⏱️ Таймаут при деактивации workflow (возможно завис)") # Продолжаем попытку активации - иногда помогает except Exception as e: logger.warning(f"⚠️ Ошибка при деактивации: {e}, продолжаю...") # Задержка перед активацией (увеличена для стабильности) await asyncio.sleep(3) # Шаг 3: Активировать workflow logger.info(f"🔄 Активирую workflow {WORKFLOW_ID}...") try: activate_response = await client.post( f"{N8N_URL}/api/v1/workflows/{WORKFLOW_ID}/activate", headers=headers, timeout=15.0 # Отдельный таймаут для активации ) if activate_response.status_code == 200: logger.info("✅ Workflow активирован") # Дополнительная задержка для инициализации trigger node await asyncio.sleep(2) # После успешного перезапуска отправляем сообщения из буфера await _send_buffered_messages() return True else: logger.error( f"❌ Ошибка активации workflow: " f"{activate_response.status_code} - {activate_response.text[:200]}" ) return False except httpx.TimeoutException: logger.error("⏱️ Таймаут при активации workflow - возможно n8n перегружен") return False except Exception as e: logger.error(f"❌ Ошибка при активации workflow: {e}") return False except httpx.TimeoutException: logger.error("⏱️ Общий таймаут при перезапуске workflow") return False except Exception as e: logger.error(f"❌ Неожиданная ошибка при перезапуске workflow: {e}", exc_info=True) return False async def _send_buffered_messages(): """ Отправить все сообщения из буфера после восстановления workflow """ try: buffer_key = "description" # Буфер для ticket_form:description messages = await redis_service.buffer_get_all(buffer_key) if not messages: logger.info("📭 Буфер пуст, нечего отправлять") return logger.info(f"📤 Отправляю {len(messages)} сообщений из буфера...") import json channel = f"{settings.redis_prefix}description" sent_count = 0 failed_count = 0 for message in messages: try: event_json = json.dumps(message.get("event", message), ensure_ascii=False) subscribers = await redis_service.publish(channel, event_json) if subscribers > 0: sent_count += 1 logger.info( f"✅ Буферированное сообщение отправлено: " f"session_id={message.get('session_id', 'unknown')}, " f"subscribers={subscribers}" ) else: failed_count += 1 logger.warning( f"⚠️ Буферированное сообщение не доставлено " f"(подписчиков нет): session_id={message.get('session_id', 'unknown')}" ) # Возвращаем обратно в буфер если не доставлено await redis_service.buffer_push(buffer_key, message) except Exception as e: failed_count += 1 logger.error(f"❌ Ошибка отправки буферизованного сообщения: {e}") # Возвращаем обратно в буфер await redis_service.buffer_push(buffer_key, message) logger.info( f"📊 Результат отправки буфера: {sent_count} отправлено, {failed_count} не доставлено" ) except Exception as e: logger.exception(f"❌ Ошибка при отправке буферизованных сообщений: {e}") def _get_headers() -> Optional[dict]: """Получить заголовки для n8n API""" if not settings.n8n_api_key: return None api_key = settings.n8n_api_key # Убираем "Bearer " если есть - n8n API использует X-N8N-API-KEY clean_key = api_key.replace("Bearer ", "").strip() # n8n API принимает ключ в заголовке X-N8N-API-KEY return {"X-N8N-API-KEY": clean_key}