Fix AI Drawer SSE Redis caching

This commit is contained in:
Fedor
2025-11-10 22:58:35 +03:00
parent 546ce83763
commit f770bd0e43

416
aiassist/ai_sse.php Normal file
View File

@@ -0,0 +1,416 @@
<?php
/**
* SSE endpoint для AI Drawer через Redis Pub/Sub
*
* Подписывается на Redis канал и отправляет ответы AI через SSE
*/
// Отключаем буферизацию
while (@ob_end_flush());
// Настройки SSE
header('Content-Type: text/event-stream');
header('Cache-Control: no-cache');
header('Connection: keep-alive');
header('Access-Control-Allow-Origin: *');
header('X-Accel-Buffering: no');
// Отключаем лимит времени
@ini_set('zlib.output_compression', 0);
@ini_set('implicit_flush', 1);
set_time_limit(0);
ignore_user_abort(false);
if (!function_exists('ai_sse_debug_log')) {
function ai_sse_debug_log($message) {
$logFile = '/var/www/fastuser/data/www/crm.clientright.ru/logs/ai_sse_debug.log';
$timestamp = date('Y-m-d H:i:s');
file_put_contents($logFile, "[{$timestamp}] {$message}\n", FILE_APPEND);
}
}
if (!function_exists('ai_sse_cache_response')) {
function ai_sse_cache_response($taskId, $responseText, $status = 'completed') {
$cacheKey = "ai:response:cache:{$taskId}";
$payload = [
'task_id' => $taskId,
'response' => $responseText,
'status' => $status,
'timestamp' => date('Y-m-d H:i:s')
];
$cacheValue = json_encode($payload, JSON_UNESCAPED_UNICODE);
ai_sse_debug_log("{$taskId}: caching response (status={$status})");
try {
if (class_exists('Redis')) {
$cacheRedis = new Redis();
if (!$cacheRedis->connect('crm.clientright.ru', 6379)) {
throw new Exception('Redis cache connection failed');
}
$cacheRedis->auth('CRM_Redis_Pass_2025_Secure!');
$result = $cacheRedis->setex($cacheKey, 300, $cacheValue);
$cacheRedis->close();
ai_sse_debug_log("{$taskId}: cache set via phpredis result=" . var_export($result, true));
return (bool)$result;
}
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
$cacheRedis = new Predis\Client([
'scheme' => 'tcp',
'host' => 'crm.clientright.ru',
'port' => 6379,
'password' => 'CRM_Redis_Pass_2025_Secure!',
]);
$result = $cacheRedis->setex($cacheKey, 300, $cacheValue);
ai_sse_debug_log("{$taskId}: cache set via Predis result=" . var_export($result, true));
return (bool)$result;
} catch (Throwable $e) {
$message = "Failed to cache response for {$taskId}: " . $e->getMessage();
error_log("[AI SSE] {$message}");
ai_sse_debug_log("{$taskId}: {$message}");
return false;
}
}
}
// Отправляем начальный padding для Nginx
echo str_repeat(' ', 4096);
echo "\n\n";
flush();
// Функция для отправки события
function sendSSE($type, $data) {
// Для кастомных событий используем event:, для обычных data:
if (in_array($type, ['connected', 'response', 'error', 'heartbeat'])) {
echo "event: {$type}\n";
}
echo "data: " . json_encode([
'type' => $type,
'data' => $data,
'timestamp' => date('Y-m-d H:i:s')
], JSON_UNESCAPED_UNICODE) . "\n\n";
flush();
}
try {
// Получаем task_id из GET параметра
$taskId = $_GET['task_id'] ?? null;
if (!$taskId) {
sendSSE('error', ['message' => 'Missing task_id parameter']);
exit();
}
error_log("[AI SSE] Starting SSE connection for task_id: {$taskId}");
// Отправляем событие подключения
sendSSE('connected', [
'message' => 'Подключено к AI событиям',
'task_id' => $taskId
]);
// Сначала проверяем - может ответ уже есть в Redis ключе (если публикация была до подписки)
$cacheKey = "ai:response:cache:{$taskId}";
$cachedResponse = null;
try {
if (class_exists('Redis')) {
$checkRedis = new Redis();
if ($checkRedis->connect('crm.clientright.ru', 6379)) {
$checkRedis->auth('CRM_Redis_Pass_2025_Secure!');
$cachedResponse = $checkRedis->get($cacheKey);
$checkRedis->close();
}
} else {
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
$checkRedis = new Predis\Client([
'scheme' => 'tcp',
'host' => 'crm.clientright.ru',
'port' => 6379,
'password' => 'CRM_Redis_Pass_2025_Secure!',
]);
$cachedResponse = $checkRedis->get($cacheKey);
}
if ($cachedResponse) {
error_log("[AI SSE] Found cached response in Redis key: {$cacheKey}");
$responseData = json_decode($cachedResponse, true);
if ($responseData && isset($responseData['response'])) {
sendSSE('response', [
'task_id' => $taskId,
'response' => $responseData['response']
]);
exit();
} elseif ($cachedResponse && !$responseData) {
// Если это просто строка
sendSSE('response', [
'task_id' => $taskId,
'response' => $cachedResponse
]);
exit();
}
}
} catch (Exception $e) {
error_log("[AI SSE] Error checking cache: " . $e->getMessage());
// Продолжаем с подпиской на канал
}
// Подключаемся к Redis
if (class_exists('Redis')) {
// Используем расширение Redis
$redis = new Redis();
if (!$redis->connect('crm.clientright.ru', 6379)) {
throw new Exception('Redis connection failed');
}
$redis->auth('CRM_Redis_Pass_2025_Secure!');
// Отправляем начальное событие
sendSSE('connected', [
'message' => 'Подключено к AI событиям',
'task_id' => $taskId
]);
// Подписываемся на канал для конкретного task_id
$channel = "ai:response:{$taskId}";
error_log("[AI SSE] Subscribing to channel: {$channel}");
// Используем правильный метод subscribe для расширения Redis
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // Без таймаута
$redis->subscribe([$channel], function($redis, $channel, $message) use ($taskId) {
error_log("[AI SSE] Received message on channel {$channel}, length: " . strlen($message));
error_log("[AI SSE] Message preview: " . substr($message, 0, 500));
// Декодируем событие (может быть JSON или просто строка)
$event = json_decode($message, true);
// Если это не JSON, значит n8n отправил просто строку ответа - ОТЛИЧНО!
if (!$event || !is_array($event)) {
error_log("[AI SSE] Message is plain text (not JSON), treating as response");
ai_sse_debug_log("{$taskId}: plain text message received");
$cacheStored = ai_sse_cache_response($taskId, $message);
if ($cacheStored) {
error_log("[AI SSE] Cached plain text response for {$taskId}");
ai_sse_debug_log("{$taskId}: plain text response cached successfully");
} else {
error_log("[AI SSE] Failed to cache plain text response for {$taskId}");
}
// Отправляем ответ клиенту
sendSSE('response', [
'task_id' => $taskId,
'response' => $message // Используем сообщение как есть
]);
error_log("[AI SSE] Response sent to client, unsubscribing");
$redis->unsubscribe([$channel]);
return;
}
error_log("[AI SSE] Decoded event: " . json_encode($event, JSON_UNESCAPED_UNICODE));
// Проверяем что это сообщение для нашего task_id
// Принимаем если task_id совпадает ИЛИ если это просто ответ (может быть без task_id)
$eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null;
$isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}");
if ($isOurTask) {
// Отправляем событие клиенту
if (isset($event['response']) && !empty($event['response'])) {
error_log("[AI SSE] Sending response event for task {$taskId}");
ai_sse_debug_log("{$taskId}: JSON response received");
$cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed');
if ($cacheStored) {
error_log("[AI SSE] Cached JSON response for {$taskId}");
ai_sse_debug_log("{$taskId}: cached JSON response");
} else {
error_log("[AI SSE] Failed to cache JSON response for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $event['response']
]);
// Отписываемся после получения ответа
$redis->unsubscribe([$channel]);
} elseif (isset($event['error']) && !empty($event['error'])) {
error_log("[AI SSE] Sending error event for task {$taskId}");
sendSSE('error', [
'task_id' => $taskId,
'error' => $event['error']
]);
// Отписываемся после получения ошибки
$redis->unsubscribe([$channel]);
} else {
error_log("[AI SSE] Event received but no response/error field. Event keys: " . implode(', ', array_keys($event)));
// Если есть другие поля, попробуем найти ответ
$possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null;
if ($possibleResponse) {
error_log("[AI SSE] Found response in alternative field");
ai_sse_debug_log("{$taskId}: alternative response field detected");
$cacheStored = ai_sse_cache_response($taskId, $possibleResponse);
if ($cacheStored) {
error_log("[AI SSE] Cached alternative response for {$taskId}");
ai_sse_debug_log("{$taskId}: cached alternative response");
} else {
error_log("[AI SSE] Failed to cache alternative response for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $possibleResponse
]);
$redis->unsubscribe([$channel]);
}
}
} else {
error_log("[AI SSE] Event task_id mismatch: expected {$taskId}, got {$eventTaskId}");
}
// Проверяем не отключился ли клиент
if (connection_aborted()) {
error_log("[AI SSE] Client disconnected, unsubscribing");
$redis->unsubscribe([$channel]);
}
});
} else {
// Используем Predis через Composer
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
$redis = new Predis\Client([
'scheme' => 'tcp',
'host' => 'crm.clientright.ru',
'port' => 6379,
'password' => 'CRM_Redis_Pass_2025_Secure!',
'database' => 0,
]);
// Отправляем начальное событие
sendSSE('connected', [
'message' => 'Подключено к AI событиям через Predis',
'task_id' => $taskId
]);
// Подписываемся на канал
$channel = "ai:response:{$taskId}";
$pubsub = $redis->pubSubLoop();
$pubsub->subscribe($channel);
$lastHeartbeat = time();
foreach ($pubsub as $message) {
// Heartbeat каждые 15 секунд
if (time() - $lastHeartbeat > 15) {
sendSSE('heartbeat', ['timestamp' => time()]);
$lastHeartbeat = time();
}
// Обрабатываем только сообщения
if ($message->kind === 'message') {
error_log("[AI SSE] Received message via Predis on channel {$channel}: " . substr($message->payload, 0, 200));
$event = json_decode($message->payload, true);
// Если это не JSON, значит n8n отправил просто строку ответа
if (!$event || !is_array($event)) {
error_log("[AI SSE] Message is not JSON via Predis, treating as plain text response");
ai_sse_debug_log("{$taskId}: plain text message via Predis");
$cacheStored = ai_sse_cache_response($taskId, $message->payload);
if ($cacheStored) {
error_log("[AI SSE] Cached plain text response via Predis for {$taskId}");
ai_sse_debug_log("{$taskId}: cached plain text via Predis");
} else {
error_log("[AI SSE] Failed to cache plain text response via Predis for {$taskId}");
}
// Если это просто строка - отправляем как ответ
sendSSE('response', [
'task_id' => $taskId,
'response' => $message->payload // Используем сообщение как есть
]);
$pubsub->unsubscribe($channel);
break;
}
error_log("[AI SSE] Decoded event via Predis: " . json_encode($event, JSON_UNESCAPED_UNICODE));
// Проверяем что это сообщение для нашего task_id
$eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null;
$isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}");
if ($isOurTask) {
if (isset($event['response']) && !empty($event['response'])) {
error_log("[AI SSE] Sending response event via Predis for task {$taskId}");
ai_sse_debug_log("{$taskId}: JSON response via Predis");
$cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed');
if ($cacheStored) {
error_log("[AI SSE] Cached JSON response via Predis for {$taskId}");
ai_sse_debug_log("{$taskId}: cached JSON via Predis");
} else {
error_log("[AI SSE] Failed to cache JSON response via Predis for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $event['response']
]);
// Отписываемся после получения ответа
$pubsub->unsubscribe($channel);
break;
} elseif (isset($event['error']) && !empty($event['error'])) {
error_log("[AI SSE] Sending error event via Predis for task {$taskId}");
sendSSE('error', [
'task_id' => $taskId,
'error' => $event['error']
]);
// Отписываемся после получения ошибки
$pubsub->unsubscribe($channel);
break;
} else {
error_log("[AI SSE] Event received via Predis but no response/error field. Event keys: " . implode(', ', array_keys($event)));
// Если есть другие поля, попробуем найти ответ
$possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null;
if ($possibleResponse) {
error_log("[AI SSE] Found response in alternative field via Predis");
ai_sse_debug_log("{$taskId}: alternative response via Predis");
$cacheStored = ai_sse_cache_response($taskId, $possibleResponse);
if ($cacheStored) {
error_log("[AI SSE] Cached alternative response via Predis for {$taskId}");
ai_sse_debug_log("{$taskId}: cached alt response via Predis");
} else {
error_log("[AI SSE] Failed to cache alternative response via Predis for {$taskId}");
}
sendSSE('response', [
'task_id' => $taskId,
'response' => $possibleResponse
]);
$pubsub->unsubscribe($channel);
break;
}
}
} else {
error_log("[AI SSE] Event task_id mismatch via Predis: expected {$taskId}, got {$eventTaskId}");
}
}
// Проверяем не отключился ли клиент
if (connection_aborted()) {
$pubsub->unsubscribe($channel);
break;
}
}
}
} catch (Exception $e) {
error_log("[AI SSE] Error: " . $e->getMessage());
sendSSE('error', ['message' => 'Redis error: ' . $e->getMessage()]);
}
?>