Files
crm.clientright.ru/aiassist/ai_sse.php
2025-11-10 22:58:35 +03:00

417 lines
19 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
/**
* SSE endpoint для AI Drawer через Redis Pub/Sub
*
* Подписывается на Redis канал и отправляет ответы AI через SSE
*/
// Отключаем буферизацию
while (@ob_end_flush());
// Настройки SSE
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('Access-Control-Allow-Origin: *');
header('X-Accel-Buffering: no');
// Отключаем лимит времени
@ini_set('zlib.output_compression', 0);
@ini_set('implicit_flush', 1);
set_time_limit(0);
ignore_user_abort(false);
if (!function_exists('ai_sse_debug_log')) {
function ai_sse_debug_log($message) {
$logFile = '/var/www/fastuser/data/www/crm.clientright.ru/logs/ai_sse_debug.log';
$timestamp = date('Y-m-d H:i:s');
file_put_contents($logFile, "[{$timestamp}] {$message}\n", FILE_APPEND);
}
}
if (!function_exists('ai_sse_cache_response')) {
function ai_sse_cache_response($taskId, $responseText, $status = 'completed') {
$cacheKey = "ai:response:cache:{$taskId}";
$payload = [
'task_id' => $taskId,
'response' => $responseText,
'status' => $status,
'timestamp' => date('Y-m-d H:i:s')
];
$cacheValue = json_encode($payload, JSON_UNESCAPED_UNICODE);
ai_sse_debug_log("{$taskId}: caching response (status={$status})");
try {
if (class_exists('Redis')) {
$cacheRedis = new Redis();
if (!$cacheRedis->connect('crm.clientright.ru', 6379)) {
throw new Exception('Redis cache connection failed');
}
$cacheRedis->auth('CRM_Redis_Pass_2025_Secure!');
$result = $cacheRedis->setex($cacheKey, 300, $cacheValue);
$cacheRedis->close();
ai_sse_debug_log("{$taskId}: cache set via phpredis result=" . var_export($result, true));
return (bool)$result;
}
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
$cacheRedis = new Predis\Client([
'scheme' => 'tcp',
'host' => 'crm.clientright.ru',
'port' => 6379,
'password' => 'CRM_Redis_Pass_2025_Secure!',
]);
$result = $cacheRedis->setex($cacheKey, 300, $cacheValue);
ai_sse_debug_log("{$taskId}: cache set via Predis result=" . var_export($result, true));
return (bool)$result;
} catch (Throwable $e) {
$message = "Failed to cache response for {$taskId}: " . $e->getMessage();
error_log("[AI SSE] {$message}");
ai_sse_debug_log("{$taskId}: {$message}");
return false;
}
}
}
// Отправляем начальный padding для Nginx
echo str_repeat(' ', 4096);
echo "\n\n";
flush();
// Функция для отправки события
function sendSSE($type, $data) {
// Для кастомных событий используем event:, для обычных data:
if (in_array($type, ['connected', 'response', 'error', 'heartbeat'])) {
echo "event: {$type}\n";
}
echo "data: " . json_encode([
'type' => $type,
'data' => $data,
'timestamp' => date('Y-m-d H:i:s')
], JSON_UNESCAPED_UNICODE) . "\n\n";
flush();
}
try {
// Получаем task_id из GET параметра
$taskId = $_GET['task_id'] ?? null;
if (!$taskId) {
sendSSE('error', ['message' => 'Missing task_id parameter']);
exit();
}
error_log("[AI SSE] Starting SSE connection for task_id: {$taskId}");
// Отправляем событие подключения
sendSSE('connected', [
'message' => 'Подключено к AI событиям',
'task_id' => $taskId
]);
// Сначала проверяем - может ответ уже есть в Redis ключе (если публикация была до подписки)
$cacheKey = "ai:response:cache:{$taskId}";
$cachedResponse = null;
try {
if (class_exists('Redis')) {
$checkRedis = new Redis();
if ($checkRedis->connect('crm.clientright.ru', 6379)) {
$checkRedis->auth('CRM_Redis_Pass_2025_Secure!');
$cachedResponse = $checkRedis->get($cacheKey);
$checkRedis->close();
}
} else {
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
$checkRedis = new Predis\Client([
'scheme' => 'tcp',
'host' => 'crm.clientright.ru',
'port' => 6379,
'password' => 'CRM_Redis_Pass_2025_Secure!',
]);
$cachedResponse = $checkRedis->get($cacheKey);
}
if ($cachedResponse) {
error_log("[AI SSE] Found cached response in Redis key: {$cacheKey}");
$responseData = json_decode($cachedResponse, true);
if ($responseData && isset($responseData['response'])) {
sendSSE('response', [
'task_id' => $taskId,
'response' => $responseData['response']
]);
exit();
} elseif ($cachedResponse && !$responseData) {
// Если это просто строка
sendSSE('response', [
'task_id' => $taskId,
'response' => $cachedResponse
]);
exit();
}
}
} catch (Exception $e) {
error_log("[AI SSE] Error checking cache: " . $e->getMessage());
// Продолжаем с подпиской на канал
}
// Подключаемся к Redis
if (class_exists('Redis')) {
// Используем расширение Redis
$redis = new Redis();
if (!$redis->connect('crm.clientright.ru', 6379)) {
throw new Exception('Redis connection failed');
}
$redis->auth('CRM_Redis_Pass_2025_Secure!');
// Отправляем начальное событие
sendSSE('connected', [
'message' => 'Подключено к AI событиям',
'task_id' => $taskId
]);
// Подписываемся на канал для конкретного task_id
$channel = "ai:response:{$taskId}";
error_log("[AI SSE] Subscribing to channel: {$channel}");
// Используем правильный метод subscribe для расширения Redis
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // Без таймаута
$redis->subscribe([$channel], function($redis, $channel, $message) use ($taskId) {
error_log("[AI SSE] Received message on channel {$channel}, length: " . strlen($message));
error_log("[AI SSE] Message preview: " . substr($message, 0, 500));
// Декодируем событие (может быть JSON или просто строка)
$event = json_decode($message, true);
// Если это не JSON, значит n8n отправил просто строку ответа - ОТЛИЧНО!
if (!$event || !is_array($event)) {
error_log("[AI SSE] Message is plain text (not JSON), treating as response");
ai_sse_debug_log("{$taskId}: plain text message received");
$cacheStored = ai_sse_cache_response($taskId, $message);
if ($cacheStored) {
error_log("[AI SSE] Cached plain text response for {$taskId}");
ai_sse_debug_log("{$taskId}: plain text response cached successfully");
} else {
error_log("[AI SSE] Failed to cache plain text response for {$taskId}");
}
// Отправляем ответ клиенту
sendSSE('response', [
'task_id' => $taskId,
'response' => $message // Используем сообщение как есть
]);
error_log("[AI SSE] Response sent to client, unsubscribing");
$redis->unsubscribe([$channel]);
return;
}
error_log("[AI SSE] Decoded event: " . json_encode($event, JSON_UNESCAPED_UNICODE));
// Проверяем что это сообщение для нашего task_id
// Принимаем если task_id совпадает ИЛИ если это просто ответ (может быть без task_id)
$eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null;
$isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}");
if ($isOurTask) {
// Отправляем событие клиенту
if (isset($event['response']) && !empty($event['response'])) {
error_log("[AI SSE] Sending response event for task {$taskId}");
ai_sse_debug_log("{$taskId}: JSON response received");
$cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed');
if ($cacheStored) {
error_log("[AI SSE] Cached JSON response for {$taskId}");
ai_sse_debug_log("{$taskId}: cached JSON response");
} else {
error_log("[AI SSE] Failed to cache JSON response for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $event['response']
]);
// Отписываемся после получения ответа
$redis->unsubscribe([$channel]);
} elseif (isset($event['error']) && !empty($event['error'])) {
error_log("[AI SSE] Sending error event for task {$taskId}");
sendSSE('error', [
'task_id' => $taskId,
'error' => $event['error']
]);
// Отписываемся после получения ошибки
$redis->unsubscribe([$channel]);
} else {
error_log("[AI SSE] Event received but no response/error field. Event keys: " . implode(', ', array_keys($event)));
// Если есть другие поля, попробуем найти ответ
$possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null;
if ($possibleResponse) {
error_log("[AI SSE] Found response in alternative field");
ai_sse_debug_log("{$taskId}: alternative response field detected");
$cacheStored = ai_sse_cache_response($taskId, $possibleResponse);
if ($cacheStored) {
error_log("[AI SSE] Cached alternative response for {$taskId}");
ai_sse_debug_log("{$taskId}: cached alternative response");
} else {
error_log("[AI SSE] Failed to cache alternative response for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $possibleResponse
]);
$redis->unsubscribe([$channel]);
}
}
} else {
error_log("[AI SSE] Event task_id mismatch: expected {$taskId}, got {$eventTaskId}");
}
// Проверяем не отключился ли клиент
if (connection_aborted()) {
error_log("[AI SSE] Client disconnected, unsubscribing");
$redis->unsubscribe([$channel]);
}
});
} else {
// Используем Predis через Composer
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
$redis = new Predis\Client([
'scheme' => 'tcp',
'host' => 'crm.clientright.ru',
'port' => 6379,
'password' => 'CRM_Redis_Pass_2025_Secure!',
'database' => 0,
]);
// Отправляем начальное событие
sendSSE('connected', [
'message' => 'Подключено к AI событиям через Predis',
'task_id' => $taskId
]);
// Подписываемся на канал
$channel = "ai:response:{$taskId}";
$pubsub = $redis->pubSubLoop();
$pubsub->subscribe($channel);
$lastHeartbeat = time();
foreach ($pubsub as $message) {
// Heartbeat каждые 15 секунд
if (time() - $lastHeartbeat > 15) {
sendSSE('heartbeat', ['timestamp' => time()]);
$lastHeartbeat = time();
}
// Обрабатываем только сообщения
if ($message->kind === 'message') {
error_log("[AI SSE] Received message via Predis on channel {$channel}: " . substr($message->payload, 0, 200));
$event = json_decode($message->payload, true);
// Если это не JSON, значит n8n отправил просто строку ответа
if (!$event || !is_array($event)) {
error_log("[AI SSE] Message is not JSON via Predis, treating as plain text response");
ai_sse_debug_log("{$taskId}: plain text message via Predis");
$cacheStored = ai_sse_cache_response($taskId, $message->payload);
if ($cacheStored) {
error_log("[AI SSE] Cached plain text response via Predis for {$taskId}");
ai_sse_debug_log("{$taskId}: cached plain text via Predis");
} else {
error_log("[AI SSE] Failed to cache plain text response via Predis for {$taskId}");
}
// Если это просто строка - отправляем как ответ
sendSSE('response', [
'task_id' => $taskId,
'response' => $message->payload // Используем сообщение как есть
]);
$pubsub->unsubscribe($channel);
break;
}
error_log("[AI SSE] Decoded event via Predis: " . json_encode($event, JSON_UNESCAPED_UNICODE));
// Проверяем что это сообщение для нашего task_id
$eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null;
$isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}");
if ($isOurTask) {
if (isset($event['response']) && !empty($event['response'])) {
error_log("[AI SSE] Sending response event via Predis for task {$taskId}");
ai_sse_debug_log("{$taskId}: JSON response via Predis");
$cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed');
if ($cacheStored) {
error_log("[AI SSE] Cached JSON response via Predis for {$taskId}");
ai_sse_debug_log("{$taskId}: cached JSON via Predis");
} else {
error_log("[AI SSE] Failed to cache JSON response via Predis for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $event['response']
]);
// Отписываемся после получения ответа
$pubsub->unsubscribe($channel);
break;
} elseif (isset($event['error']) && !empty($event['error'])) {
error_log("[AI SSE] Sending error event via Predis for task {$taskId}");
sendSSE('error', [
'task_id' => $taskId,
'error' => $event['error']
]);
// Отписываемся после получения ошибки
$pubsub->unsubscribe($channel);
break;
} else {
error_log("[AI SSE] Event received via Predis but no response/error field. Event keys: " . implode(', ', array_keys($event)));
// Если есть другие поля, попробуем найти ответ
$possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null;
if ($possibleResponse) {
error_log("[AI SSE] Found response in alternative field via Predis");
ai_sse_debug_log("{$taskId}: alternative response via Predis");
$cacheStored = ai_sse_cache_response($taskId, $possibleResponse);
if ($cacheStored) {
error_log("[AI SSE] Cached alternative response via Predis for {$taskId}");
ai_sse_debug_log("{$taskId}: cached alt response via Predis");
} else {
error_log("[AI SSE] Failed to cache alternative response via Predis for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $possibleResponse
]);
$pubsub->unsubscribe($channel);
break;
}
}
} else {
error_log("[AI SSE] Event task_id mismatch via Predis: expected {$taskId}, got {$eventTaskId}");
}
}
// Проверяем не отключился ли клиент
if (connection_aborted()) {
$pubsub->unsubscribe($channel);
break;
}
}
}
} catch (Exception $e) {
error_log("[AI SSE] Error: " . $e->getMessage());
sendSSE('error', ['message' => 'Redis error: ' . $e->getMessage()]);
}
?>