From f770bd0e43503a6c896ba1cef22457d0284aee7a Mon Sep 17 00:00:00 2001 From: Fedor Date: Mon, 10 Nov 2025 22:58:35 +0300 Subject: [PATCH] Fix AI Drawer SSE Redis caching --- aiassist/ai_sse.php | 416 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 416 insertions(+) create mode 100644 aiassist/ai_sse.php diff --git a/aiassist/ai_sse.php b/aiassist/ai_sse.php new file mode 100644 index 00000000..d0d8c2d4 --- /dev/null +++ b/aiassist/ai_sse.php @@ -0,0 +1,416 @@ + $taskId, + 'response' => $responseText, + 'status' => $status, + 'timestamp' => date('Y-m-d H:i:s') + ]; + $cacheValue = json_encode($payload, JSON_UNESCAPED_UNICODE); + ai_sse_debug_log("{$taskId}: caching response (status={$status})"); + try { + if (class_exists('Redis')) { + $cacheRedis = new Redis(); + if (!$cacheRedis->connect('crm.clientright.ru', 6379)) { + throw new Exception('Redis cache connection failed'); + } + $cacheRedis->auth('CRM_Redis_Pass_2025_Secure!'); + $result = $cacheRedis->setex($cacheKey, 300, $cacheValue); + $cacheRedis->close(); + ai_sse_debug_log("{$taskId}: cache set via phpredis result=" . var_export($result, true)); + return (bool)$result; + } + + require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php'; + $cacheRedis = new Predis\Client([ + 'scheme' => 'tcp', + 'host' => 'crm.clientright.ru', + 'port' => 6379, + 'password' => 'CRM_Redis_Pass_2025_Secure!', + ]); + $result = $cacheRedis->setex($cacheKey, 300, $cacheValue); + ai_sse_debug_log("{$taskId}: cache set via Predis result=" . var_export($result, true)); + return (bool)$result; + } catch (Throwable $e) { + $message = "Failed to cache response for {$taskId}: " . $e->getMessage(); + error_log("[AI SSE] {$message}"); + ai_sse_debug_log("{$taskId}: {$message}"); + return false; + } + } +} + +// Отправляем начальный padding для Nginx +echo str_repeat(' ', 4096); +echo "\n\n"; +flush(); + +// Функция для отправки события +function sendSSE($type, $data) { + // Для кастомных событий используем event:, для обычных data: + if (in_array($type, ['connected', 'response', 'error', 'heartbeat'])) { + echo "event: {$type}\n"; + } + echo "data: " . json_encode([ + 'type' => $type, + 'data' => $data, + 'timestamp' => date('Y-m-d H:i:s') + ], JSON_UNESCAPED_UNICODE) . "\n\n"; + flush(); +} + +try { + // Получаем task_id из GET параметра + $taskId = $_GET['task_id'] ?? null; + + if (!$taskId) { + sendSSE('error', ['message' => 'Missing task_id parameter']); + exit(); + } + + error_log("[AI SSE] Starting SSE connection for task_id: {$taskId}"); + + // Отправляем событие подключения + sendSSE('connected', [ + 'message' => 'Подключено к AI событиям', + 'task_id' => $taskId + ]); + + // Сначала проверяем - может ответ уже есть в Redis ключе (если публикация была до подписки) + $cacheKey = "ai:response:cache:{$taskId}"; + $cachedResponse = null; + + try { + if (class_exists('Redis')) { + $checkRedis = new Redis(); + if ($checkRedis->connect('crm.clientright.ru', 6379)) { + $checkRedis->auth('CRM_Redis_Pass_2025_Secure!'); + $cachedResponse = $checkRedis->get($cacheKey); + $checkRedis->close(); + } + } else { + require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php'; + $checkRedis = new Predis\Client([ + 'scheme' => 'tcp', + 'host' => 'crm.clientright.ru', + 'port' => 6379, + 'password' => 'CRM_Redis_Pass_2025_Secure!', + ]); + $cachedResponse = $checkRedis->get($cacheKey); + } + + if ($cachedResponse) { + error_log("[AI SSE] Found cached response in Redis key: {$cacheKey}"); + $responseData = json_decode($cachedResponse, true); + + if ($responseData && isset($responseData['response'])) { + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $responseData['response'] + ]); + exit(); + } elseif ($cachedResponse && !$responseData) { + // Если это просто строка + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $cachedResponse + ]); + exit(); + } + } + } catch (Exception $e) { + error_log("[AI SSE] Error checking cache: " . $e->getMessage()); + // Продолжаем с подпиской на канал + } + + // Подключаемся к Redis + if (class_exists('Redis')) { + // Используем расширение Redis + $redis = new Redis(); + if (!$redis->connect('crm.clientright.ru', 6379)) { + throw new Exception('Redis connection failed'); + } + $redis->auth('CRM_Redis_Pass_2025_Secure!'); + + // Отправляем начальное событие + sendSSE('connected', [ + 'message' => 'Подключено к AI событиям', + 'task_id' => $taskId + ]); + + // Подписываемся на канал для конкретного task_id + $channel = "ai:response:{$taskId}"; + error_log("[AI SSE] Subscribing to channel: {$channel}"); + + // Используем правильный метод subscribe для расширения Redis + $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // Без таймаута + + $redis->subscribe([$channel], function($redis, $channel, $message) use ($taskId) { + error_log("[AI SSE] Received message on channel {$channel}, length: " . strlen($message)); + error_log("[AI SSE] Message preview: " . substr($message, 0, 500)); + + // Декодируем событие (может быть JSON или просто строка) + $event = json_decode($message, true); + + // Если это не JSON, значит n8n отправил просто строку ответа - ОТЛИЧНО! + if (!$event || !is_array($event)) { + error_log("[AI SSE] Message is plain text (not JSON), treating as response"); + ai_sse_debug_log("{$taskId}: plain text message received"); + + $cacheStored = ai_sse_cache_response($taskId, $message); + if ($cacheStored) { + error_log("[AI SSE] Cached plain text response for {$taskId}"); + ai_sse_debug_log("{$taskId}: plain text response cached successfully"); + } else { + error_log("[AI SSE] Failed to cache plain text response for {$taskId}"); + } + + // Отправляем ответ клиенту + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $message // Используем сообщение как есть + ]); + error_log("[AI SSE] Response sent to client, unsubscribing"); + $redis->unsubscribe([$channel]); + return; + } + + error_log("[AI SSE] Decoded event: " . json_encode($event, JSON_UNESCAPED_UNICODE)); + + // Проверяем что это сообщение для нашего task_id + // Принимаем если task_id совпадает ИЛИ если это просто ответ (может быть без task_id) + $eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null; + $isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}"); + + if ($isOurTask) { + // Отправляем событие клиенту + if (isset($event['response']) && !empty($event['response'])) { + error_log("[AI SSE] Sending response event for task {$taskId}"); + ai_sse_debug_log("{$taskId}: JSON response received"); + + $cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed'); + if ($cacheStored) { + error_log("[AI SSE] Cached JSON response for {$taskId}"); + ai_sse_debug_log("{$taskId}: cached JSON response"); + } else { + error_log("[AI SSE] Failed to cache JSON response for {$taskId}"); + } + + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $event['response'] + ]); + // Отписываемся после получения ответа + $redis->unsubscribe([$channel]); + } elseif (isset($event['error']) && !empty($event['error'])) { + error_log("[AI SSE] Sending error event for task {$taskId}"); + sendSSE('error', [ + 'task_id' => $taskId, + 'error' => $event['error'] + ]); + // Отписываемся после получения ошибки + $redis->unsubscribe([$channel]); + } else { + error_log("[AI SSE] Event received but no response/error field. Event keys: " . implode(', ', array_keys($event))); + // Если есть другие поля, попробуем найти ответ + $possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null; + if ($possibleResponse) { + error_log("[AI SSE] Found response in alternative field"); + ai_sse_debug_log("{$taskId}: alternative response field detected"); + + $cacheStored = ai_sse_cache_response($taskId, $possibleResponse); + if ($cacheStored) { + error_log("[AI SSE] Cached alternative response for {$taskId}"); + ai_sse_debug_log("{$taskId}: cached alternative response"); + } else { + error_log("[AI SSE] Failed to cache alternative response for {$taskId}"); + } + + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $possibleResponse + ]); + $redis->unsubscribe([$channel]); + } + } + } else { + error_log("[AI SSE] Event task_id mismatch: expected {$taskId}, got {$eventTaskId}"); + } + + // Проверяем не отключился ли клиент + if (connection_aborted()) { + error_log("[AI SSE] Client disconnected, unsubscribing"); + $redis->unsubscribe([$channel]); + } + }); + + } else { + // Используем Predis через Composer + require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php'; + + $redis = new Predis\Client([ + 'scheme' => 'tcp', + 'host' => 'crm.clientright.ru', + 'port' => 6379, + 'password' => 'CRM_Redis_Pass_2025_Secure!', + 'database' => 0, + ]); + + // Отправляем начальное событие + sendSSE('connected', [ + 'message' => 'Подключено к AI событиям через Predis', + 'task_id' => $taskId + ]); + + // Подписываемся на канал + $channel = "ai:response:{$taskId}"; + $pubsub = $redis->pubSubLoop(); + $pubsub->subscribe($channel); + + $lastHeartbeat = time(); + + foreach ($pubsub as $message) { + // Heartbeat каждые 15 секунд + if (time() - $lastHeartbeat > 15) { + sendSSE('heartbeat', ['timestamp' => time()]); + $lastHeartbeat = time(); + } + + // Обрабатываем только сообщения + if ($message->kind === 'message') { + error_log("[AI SSE] Received message via Predis on channel {$channel}: " . substr($message->payload, 0, 200)); + + $event = json_decode($message->payload, true); + + // Если это не JSON, значит n8n отправил просто строку ответа + if (!$event || !is_array($event)) { + error_log("[AI SSE] Message is not JSON via Predis, treating as plain text response"); + ai_sse_debug_log("{$taskId}: plain text message via Predis"); + + $cacheStored = ai_sse_cache_response($taskId, $message->payload); + if ($cacheStored) { + error_log("[AI SSE] Cached plain text response via Predis for {$taskId}"); + ai_sse_debug_log("{$taskId}: cached plain text via Predis"); + } else { + error_log("[AI SSE] Failed to cache plain text response via Predis for {$taskId}"); + } + + // Если это просто строка - отправляем как ответ + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $message->payload // Используем сообщение как есть + ]); + $pubsub->unsubscribe($channel); + break; + } + + error_log("[AI SSE] Decoded event via Predis: " . json_encode($event, JSON_UNESCAPED_UNICODE)); + + // Проверяем что это сообщение для нашего task_id + $eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null; + $isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}"); + + if ($isOurTask) { + if (isset($event['response']) && !empty($event['response'])) { + error_log("[AI SSE] Sending response event via Predis for task {$taskId}"); + ai_sse_debug_log("{$taskId}: JSON response via Predis"); + + $cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed'); + if ($cacheStored) { + error_log("[AI SSE] Cached JSON response via Predis for {$taskId}"); + ai_sse_debug_log("{$taskId}: cached JSON via Predis"); + } else { + error_log("[AI SSE] Failed to cache JSON response via Predis for {$taskId}"); + } + + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $event['response'] + ]); + // Отписываемся после получения ответа + $pubsub->unsubscribe($channel); + break; + } elseif (isset($event['error']) && !empty($event['error'])) { + error_log("[AI SSE] Sending error event via Predis for task {$taskId}"); + sendSSE('error', [ + 'task_id' => $taskId, + 'error' => $event['error'] + ]); + // Отписываемся после получения ошибки + $pubsub->unsubscribe($channel); + break; + } else { + error_log("[AI SSE] Event received via Predis but no response/error field. Event keys: " . implode(', ', array_keys($event))); + // Если есть другие поля, попробуем найти ответ + $possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null; + if ($possibleResponse) { + error_log("[AI SSE] Found response in alternative field via Predis"); + ai_sse_debug_log("{$taskId}: alternative response via Predis"); + + $cacheStored = ai_sse_cache_response($taskId, $possibleResponse); + if ($cacheStored) { + error_log("[AI SSE] Cached alternative response via Predis for {$taskId}"); + ai_sse_debug_log("{$taskId}: cached alt response via Predis"); + } else { + error_log("[AI SSE] Failed to cache alternative response via Predis for {$taskId}"); + } + + sendSSE('response', [ + 'task_id' => $taskId, + 'response' => $possibleResponse + ]); + $pubsub->unsubscribe($channel); + break; + } + } + } else { + error_log("[AI SSE] Event task_id mismatch via Predis: expected {$taskId}, got {$eventTaskId}"); + } + } + + // Проверяем не отключился ли клиент + if (connection_aborted()) { + $pubsub->unsubscribe($channel); + break; + } + } + } + +} catch (Exception $e) { + error_log("[AI SSE] Error: " . $e->getMessage()); + sendSSE('error', ['message' => 'Redis error: ' . $e->getMessage()]); +} +?> +