$taskId, 'response' => $responseText, 'status' => $status, 'timestamp' => date('Y-m-d H:i:s') ]; $cacheValue = json_encode($payload, JSON_UNESCAPED_UNICODE); ai_sse_debug_log("{$taskId}: caching response (status={$status})"); try { if (class_exists('Redis')) { $cacheRedis = new Redis(); if (!$cacheRedis->connect('crm.clientright.ru', 6379)) { throw new Exception('Redis cache connection failed'); } $cacheRedis->auth('CRM_Redis_Pass_2025_Secure!'); $result = $cacheRedis->setex($cacheKey, 300, $cacheValue); $cacheRedis->close(); ai_sse_debug_log("{$taskId}: cache set via phpredis result=" . var_export($result, true)); return (bool)$result; } require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php'; $cacheRedis = new Predis\Client([ 'scheme' => 'tcp', 'host' => 'crm.clientright.ru', 'port' => 6379, 'password' => 'CRM_Redis_Pass_2025_Secure!', ]); $result = $cacheRedis->setex($cacheKey, 300, $cacheValue); ai_sse_debug_log("{$taskId}: cache set via Predis result=" . var_export($result, true)); return (bool)$result; } catch (Throwable $e) { $message = "Failed to cache response for {$taskId}: " . $e->getMessage(); error_log("[AI SSE] {$message}"); ai_sse_debug_log("{$taskId}: {$message}"); return false; } } } // Отправляем начальный padding для Nginx echo str_repeat(' ', 4096); echo "\n\n"; flush(); // Функция для отправки события function sendSSE($type, $data) { // Для кастомных событий используем event:, для обычных data: if (in_array($type, ['connected', 'response', 'error', 'heartbeat'])) { echo "event: {$type}\n"; } echo "data: " . json_encode([ 'type' => $type, 'data' => $data, 'timestamp' => date('Y-m-d H:i:s') ], JSON_UNESCAPED_UNICODE) . "\n\n"; flush(); } try { // Получаем task_id из GET параметра $taskId = $_GET['task_id'] ?? null; if (!$taskId) { sendSSE('error', ['message' => 'Missing task_id parameter']); exit(); } error_log("[AI SSE] Starting SSE connection for task_id: {$taskId}"); // Отправляем событие подключения sendSSE('connected', [ 'message' => 'Подключено к AI событиям', 'task_id' => $taskId ]); // Сначала проверяем - может ответ уже есть в Redis ключе (если публикация была до подписки) $cacheKey = "ai:response:cache:{$taskId}"; $cachedResponse = null; try { if (class_exists('Redis')) { $checkRedis = new Redis(); if ($checkRedis->connect('crm.clientright.ru', 6379)) { $checkRedis->auth('CRM_Redis_Pass_2025_Secure!'); $cachedResponse = $checkRedis->get($cacheKey); $checkRedis->close(); } } else { require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php'; $checkRedis = new Predis\Client([ 'scheme' => 'tcp', 'host' => 'crm.clientright.ru', 'port' => 6379, 'password' => 'CRM_Redis_Pass_2025_Secure!', ]); $cachedResponse = $checkRedis->get($cacheKey); } if ($cachedResponse) { error_log("[AI SSE] Found cached response in Redis key: {$cacheKey}"); $responseData = json_decode($cachedResponse, true); if ($responseData && isset($responseData['response'])) { sendSSE('response', [ 'task_id' => $taskId, 'response' => $responseData['response'] ]); exit(); } elseif ($cachedResponse && !$responseData) { // Если это просто строка sendSSE('response', [ 'task_id' => $taskId, 'response' => $cachedResponse ]); exit(); } } } catch (Exception $e) { error_log("[AI SSE] Error checking cache: " . $e->getMessage()); // Продолжаем с подпиской на канал } // Подключаемся к Redis if (class_exists('Redis')) { // Используем расширение Redis $redis = new Redis(); if (!$redis->connect('crm.clientright.ru', 6379)) { throw new Exception('Redis connection failed'); } $redis->auth('CRM_Redis_Pass_2025_Secure!'); // Отправляем начальное событие sendSSE('connected', [ 'message' => 'Подключено к AI событиям', 'task_id' => $taskId ]); // Подписываемся на канал для конкретного task_id $channel = "ai:response:{$taskId}"; error_log("[AI SSE] Subscribing to channel: {$channel}"); // Используем правильный метод subscribe для расширения Redis $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // Без таймаута $redis->subscribe([$channel], function($redis, $channel, $message) use ($taskId) { error_log("[AI SSE] Received message on channel {$channel}, length: " . strlen($message)); error_log("[AI SSE] Message preview: " . substr($message, 0, 500)); // Декодируем событие (может быть JSON или просто строка) $event = json_decode($message, true); // Если это не JSON, значит n8n отправил просто строку ответа - ОТЛИЧНО! if (!$event || !is_array($event)) { error_log("[AI SSE] Message is plain text (not JSON), treating as response"); ai_sse_debug_log("{$taskId}: plain text message received"); $cacheStored = ai_sse_cache_response($taskId, $message); if ($cacheStored) { error_log("[AI SSE] Cached plain text response for {$taskId}"); ai_sse_debug_log("{$taskId}: plain text response cached successfully"); } else { error_log("[AI SSE] Failed to cache plain text response for {$taskId}"); } // Отправляем ответ клиенту sendSSE('response', [ 'task_id' => $taskId, 'response' => $message // Используем сообщение как есть ]); error_log("[AI SSE] Response sent to client, unsubscribing"); $redis->unsubscribe([$channel]); return; } error_log("[AI SSE] Decoded event: " . json_encode($event, JSON_UNESCAPED_UNICODE)); // Проверяем что это сообщение для нашего task_id // Принимаем если task_id совпадает ИЛИ если это просто ответ (может быть без task_id) $eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null; $isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}"); if ($isOurTask) { // Отправляем событие клиенту if (isset($event['response']) && !empty($event['response'])) { error_log("[AI SSE] Sending response event for task {$taskId}"); ai_sse_debug_log("{$taskId}: JSON response received"); $cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed'); if ($cacheStored) { error_log("[AI SSE] Cached JSON response for {$taskId}"); ai_sse_debug_log("{$taskId}: cached JSON response"); } else { error_log("[AI SSE] Failed to cache JSON response for {$taskId}"); } sendSSE('response', [ 'task_id' => $taskId, 'response' => $event['response'] ]); // Отписываемся после получения ответа $redis->unsubscribe([$channel]); } elseif (isset($event['error']) && !empty($event['error'])) { error_log("[AI SSE] Sending error event for task {$taskId}"); sendSSE('error', [ 'task_id' => $taskId, 'error' => $event['error'] ]); // Отписываемся после получения ошибки $redis->unsubscribe([$channel]); } else { error_log("[AI SSE] Event received but no response/error field. Event keys: " . implode(', ', array_keys($event))); // Если есть другие поля, попробуем найти ответ $possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null; if ($possibleResponse) { error_log("[AI SSE] Found response in alternative field"); ai_sse_debug_log("{$taskId}: alternative response field detected"); $cacheStored = ai_sse_cache_response($taskId, $possibleResponse); if ($cacheStored) { error_log("[AI SSE] Cached alternative response for {$taskId}"); ai_sse_debug_log("{$taskId}: cached alternative response"); } else { error_log("[AI SSE] Failed to cache alternative response for {$taskId}"); } sendSSE('response', [ 'task_id' => $taskId, 'response' => $possibleResponse ]); $redis->unsubscribe([$channel]); } } } else { error_log("[AI SSE] Event task_id mismatch: expected {$taskId}, got {$eventTaskId}"); } // Проверяем не отключился ли клиент if (connection_aborted()) { error_log("[AI SSE] Client disconnected, unsubscribing"); $redis->unsubscribe([$channel]); } }); } else { // Используем Predis через Composer require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php'; $redis = new Predis\Client([ 'scheme' => 'tcp', 'host' => 'crm.clientright.ru', 'port' => 6379, 'password' => 'CRM_Redis_Pass_2025_Secure!', 'database' => 0, ]); // Отправляем начальное событие sendSSE('connected', [ 'message' => 'Подключено к AI событиям через Predis', 'task_id' => $taskId ]); // Подписываемся на канал $channel = "ai:response:{$taskId}"; $pubsub = $redis->pubSubLoop(); $pubsub->subscribe($channel); $lastHeartbeat = time(); foreach ($pubsub as $message) { // Heartbeat каждые 15 секунд if (time() - $lastHeartbeat > 15) { sendSSE('heartbeat', ['timestamp' => time()]); $lastHeartbeat = time(); } // Обрабатываем только сообщения if ($message->kind === 'message') { error_log("[AI SSE] Received message via Predis on channel {$channel}: " . substr($message->payload, 0, 200)); $event = json_decode($message->payload, true); // Если это не JSON, значит n8n отправил просто строку ответа if (!$event || !is_array($event)) { error_log("[AI SSE] Message is not JSON via Predis, treating as plain text response"); ai_sse_debug_log("{$taskId}: plain text message via Predis"); $cacheStored = ai_sse_cache_response($taskId, $message->payload); if ($cacheStored) { error_log("[AI SSE] Cached plain text response via Predis for {$taskId}"); ai_sse_debug_log("{$taskId}: cached plain text via Predis"); } else { error_log("[AI SSE] Failed to cache plain text response via Predis for {$taskId}"); } // Если это просто строка - отправляем как ответ sendSSE('response', [ 'task_id' => $taskId, 'response' => $message->payload // Используем сообщение как есть ]); $pubsub->unsubscribe($channel); break; } error_log("[AI SSE] Decoded event via Predis: " . json_encode($event, JSON_UNESCAPED_UNICODE)); // Проверяем что это сообщение для нашего task_id $eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null; $isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}"); if ($isOurTask) { if (isset($event['response']) && !empty($event['response'])) { error_log("[AI SSE] Sending response event via Predis for task {$taskId}"); ai_sse_debug_log("{$taskId}: JSON response via Predis"); $cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed'); if ($cacheStored) { error_log("[AI SSE] Cached JSON response via Predis for {$taskId}"); ai_sse_debug_log("{$taskId}: cached JSON via Predis"); } else { error_log("[AI SSE] Failed to cache JSON response via Predis for {$taskId}"); } sendSSE('response', [ 'task_id' => $taskId, 'response' => $event['response'] ]); // Отписываемся после получения ответа $pubsub->unsubscribe($channel); break; } elseif (isset($event['error']) && !empty($event['error'])) { error_log("[AI SSE] Sending error event via Predis for task {$taskId}"); sendSSE('error', [ 'task_id' => $taskId, 'error' => $event['error'] ]); // Отписываемся после получения ошибки $pubsub->unsubscribe($channel); break; } else { error_log("[AI SSE] Event received via Predis but no response/error field. Event keys: " . implode(', ', array_keys($event))); // Если есть другие поля, попробуем найти ответ $possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null; if ($possibleResponse) { error_log("[AI SSE] Found response in alternative field via Predis"); ai_sse_debug_log("{$taskId}: alternative response via Predis"); $cacheStored = ai_sse_cache_response($taskId, $possibleResponse); if ($cacheStored) { error_log("[AI SSE] Cached alternative response via Predis for {$taskId}"); ai_sse_debug_log("{$taskId}: cached alt response via Predis"); } else { error_log("[AI SSE] Failed to cache alternative response via Predis for {$taskId}"); } sendSSE('response', [ 'task_id' => $taskId, 'response' => $possibleResponse ]); $pubsub->unsubscribe($channel); break; } } } else { error_log("[AI SSE] Event task_id mismatch via Predis: expected {$taskId}, got {$eventTaskId}"); } } // Проверяем не отключился ли клиент if (connection_aborted()) { $pubsub->unsubscribe($channel); break; } } } } catch (Exception $e) { error_log("[AI SSE] Error: " . $e->getMessage()); sendSSE('error', ['message' => 'Redis error: ' . $e->getMessage()]); } ?>