417 lines
19 KiB
PHP
417 lines
19 KiB
PHP
<?php
|
||
/**
|
||
* SSE endpoint для AI Drawer через Redis Pub/Sub
|
||
*
|
||
* Подписывается на Redis канал и отправляет ответы AI через SSE
|
||
*/
|
||
|
||
// Отключаем буферизацию
|
||
while (@ob_end_flush());
|
||
|
||
// Настройки SSE
|
||
header('Content-Type: text/event-stream');
|
||
header('Cache-Control: no-cache');
|
||
header('Connection: keep-alive');
|
||
header('Access-Control-Allow-Origin: *');
|
||
header('X-Accel-Buffering: no');
|
||
|
||
// Отключаем лимит времени
|
||
@ini_set('zlib.output_compression', 0);
|
||
@ini_set('implicit_flush', 1);
|
||
set_time_limit(0);
|
||
ignore_user_abort(false);
|
||
|
||
if (!function_exists('ai_sse_debug_log')) {
|
||
function ai_sse_debug_log($message) {
|
||
$logFile = '/var/www/fastuser/data/www/crm.clientright.ru/logs/ai_sse_debug.log';
|
||
$timestamp = date('Y-m-d H:i:s');
|
||
file_put_contents($logFile, "[{$timestamp}] {$message}\n", FILE_APPEND);
|
||
}
|
||
}
|
||
|
||
if (!function_exists('ai_sse_cache_response')) {
|
||
function ai_sse_cache_response($taskId, $responseText, $status = 'completed') {
|
||
$cacheKey = "ai:response:cache:{$taskId}";
|
||
$payload = [
|
||
'task_id' => $taskId,
|
||
'response' => $responseText,
|
||
'status' => $status,
|
||
'timestamp' => date('Y-m-d H:i:s')
|
||
];
|
||
$cacheValue = json_encode($payload, JSON_UNESCAPED_UNICODE);
|
||
ai_sse_debug_log("{$taskId}: caching response (status={$status})");
|
||
try {
|
||
if (class_exists('Redis')) {
|
||
$cacheRedis = new Redis();
|
||
if (!$cacheRedis->connect('crm.clientright.ru', 6379)) {
|
||
throw new Exception('Redis cache connection failed');
|
||
}
|
||
$cacheRedis->auth('CRM_Redis_Pass_2025_Secure!');
|
||
$result = $cacheRedis->setex($cacheKey, 300, $cacheValue);
|
||
$cacheRedis->close();
|
||
ai_sse_debug_log("{$taskId}: cache set via phpredis result=" . var_export($result, true));
|
||
return (bool)$result;
|
||
}
|
||
|
||
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
|
||
$cacheRedis = new Predis\Client([
|
||
'scheme' => 'tcp',
|
||
'host' => 'crm.clientright.ru',
|
||
'port' => 6379,
|
||
'password' => 'CRM_Redis_Pass_2025_Secure!',
|
||
]);
|
||
$result = $cacheRedis->setex($cacheKey, 300, $cacheValue);
|
||
ai_sse_debug_log("{$taskId}: cache set via Predis result=" . var_export($result, true));
|
||
return (bool)$result;
|
||
} catch (Throwable $e) {
|
||
$message = "Failed to cache response for {$taskId}: " . $e->getMessage();
|
||
error_log("[AI SSE] {$message}");
|
||
ai_sse_debug_log("{$taskId}: {$message}");
|
||
return false;
|
||
}
|
||
}
|
||
}
|
||
|
||
// Отправляем начальный padding для Nginx
|
||
echo str_repeat(' ', 4096);
|
||
echo "\n\n";
|
||
flush();
|
||
|
||
// Функция для отправки события
|
||
function sendSSE($type, $data) {
|
||
// Для кастомных событий используем event:, для обычных data:
|
||
if (in_array($type, ['connected', 'response', 'error', 'heartbeat'])) {
|
||
echo "event: {$type}\n";
|
||
}
|
||
echo "data: " . json_encode([
|
||
'type' => $type,
|
||
'data' => $data,
|
||
'timestamp' => date('Y-m-d H:i:s')
|
||
], JSON_UNESCAPED_UNICODE) . "\n\n";
|
||
flush();
|
||
}
|
||
|
||
try {
|
||
// Получаем task_id из GET параметра
|
||
$taskId = $_GET['task_id'] ?? null;
|
||
|
||
if (!$taskId) {
|
||
sendSSE('error', ['message' => 'Missing task_id parameter']);
|
||
exit();
|
||
}
|
||
|
||
error_log("[AI SSE] Starting SSE connection for task_id: {$taskId}");
|
||
|
||
// Отправляем событие подключения
|
||
sendSSE('connected', [
|
||
'message' => 'Подключено к AI событиям',
|
||
'task_id' => $taskId
|
||
]);
|
||
|
||
// Сначала проверяем - может ответ уже есть в Redis ключе (если публикация была до подписки)
|
||
$cacheKey = "ai:response:cache:{$taskId}";
|
||
$cachedResponse = null;
|
||
|
||
try {
|
||
if (class_exists('Redis')) {
|
||
$checkRedis = new Redis();
|
||
if ($checkRedis->connect('crm.clientright.ru', 6379)) {
|
||
$checkRedis->auth('CRM_Redis_Pass_2025_Secure!');
|
||
$cachedResponse = $checkRedis->get($cacheKey);
|
||
$checkRedis->close();
|
||
}
|
||
} else {
|
||
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
|
||
$checkRedis = new Predis\Client([
|
||
'scheme' => 'tcp',
|
||
'host' => 'crm.clientright.ru',
|
||
'port' => 6379,
|
||
'password' => 'CRM_Redis_Pass_2025_Secure!',
|
||
]);
|
||
$cachedResponse = $checkRedis->get($cacheKey);
|
||
}
|
||
|
||
if ($cachedResponse) {
|
||
error_log("[AI SSE] Found cached response in Redis key: {$cacheKey}");
|
||
$responseData = json_decode($cachedResponse, true);
|
||
|
||
if ($responseData && isset($responseData['response'])) {
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $responseData['response']
|
||
]);
|
||
exit();
|
||
} elseif ($cachedResponse && !$responseData) {
|
||
// Если это просто строка
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $cachedResponse
|
||
]);
|
||
exit();
|
||
}
|
||
}
|
||
} catch (Exception $e) {
|
||
error_log("[AI SSE] Error checking cache: " . $e->getMessage());
|
||
// Продолжаем с подпиской на канал
|
||
}
|
||
|
||
// Подключаемся к Redis
|
||
if (class_exists('Redis')) {
|
||
// Используем расширение Redis
|
||
$redis = new Redis();
|
||
if (!$redis->connect('crm.clientright.ru', 6379)) {
|
||
throw new Exception('Redis connection failed');
|
||
}
|
||
$redis->auth('CRM_Redis_Pass_2025_Secure!');
|
||
|
||
// Отправляем начальное событие
|
||
sendSSE('connected', [
|
||
'message' => 'Подключено к AI событиям',
|
||
'task_id' => $taskId
|
||
]);
|
||
|
||
// Подписываемся на канал для конкретного task_id
|
||
$channel = "ai:response:{$taskId}";
|
||
error_log("[AI SSE] Subscribing to channel: {$channel}");
|
||
|
||
// Используем правильный метод subscribe для расширения Redis
|
||
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // Без таймаута
|
||
|
||
$redis->subscribe([$channel], function($redis, $channel, $message) use ($taskId) {
|
||
error_log("[AI SSE] Received message on channel {$channel}, length: " . strlen($message));
|
||
error_log("[AI SSE] Message preview: " . substr($message, 0, 500));
|
||
|
||
// Декодируем событие (может быть JSON или просто строка)
|
||
$event = json_decode($message, true);
|
||
|
||
// Если это не JSON, значит n8n отправил просто строку ответа - ОТЛИЧНО!
|
||
if (!$event || !is_array($event)) {
|
||
error_log("[AI SSE] Message is plain text (not JSON), treating as response");
|
||
ai_sse_debug_log("{$taskId}: plain text message received");
|
||
|
||
$cacheStored = ai_sse_cache_response($taskId, $message);
|
||
if ($cacheStored) {
|
||
error_log("[AI SSE] Cached plain text response for {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: plain text response cached successfully");
|
||
} else {
|
||
error_log("[AI SSE] Failed to cache plain text response for {$taskId}");
|
||
}
|
||
|
||
// Отправляем ответ клиенту
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $message // Используем сообщение как есть
|
||
]);
|
||
error_log("[AI SSE] Response sent to client, unsubscribing");
|
||
$redis->unsubscribe([$channel]);
|
||
return;
|
||
}
|
||
|
||
error_log("[AI SSE] Decoded event: " . json_encode($event, JSON_UNESCAPED_UNICODE));
|
||
|
||
// Проверяем что это сообщение для нашего task_id
|
||
// Принимаем если task_id совпадает ИЛИ если это просто ответ (может быть без task_id)
|
||
$eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null;
|
||
$isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}");
|
||
|
||
if ($isOurTask) {
|
||
// Отправляем событие клиенту
|
||
if (isset($event['response']) && !empty($event['response'])) {
|
||
error_log("[AI SSE] Sending response event for task {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: JSON response received");
|
||
|
||
$cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed');
|
||
if ($cacheStored) {
|
||
error_log("[AI SSE] Cached JSON response for {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: cached JSON response");
|
||
} else {
|
||
error_log("[AI SSE] Failed to cache JSON response for {$taskId}");
|
||
}
|
||
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $event['response']
|
||
]);
|
||
// Отписываемся после получения ответа
|
||
$redis->unsubscribe([$channel]);
|
||
} elseif (isset($event['error']) && !empty($event['error'])) {
|
||
error_log("[AI SSE] Sending error event for task {$taskId}");
|
||
sendSSE('error', [
|
||
'task_id' => $taskId,
|
||
'error' => $event['error']
|
||
]);
|
||
// Отписываемся после получения ошибки
|
||
$redis->unsubscribe([$channel]);
|
||
} else {
|
||
error_log("[AI SSE] Event received but no response/error field. Event keys: " . implode(', ', array_keys($event)));
|
||
// Если есть другие поля, попробуем найти ответ
|
||
$possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null;
|
||
if ($possibleResponse) {
|
||
error_log("[AI SSE] Found response in alternative field");
|
||
ai_sse_debug_log("{$taskId}: alternative response field detected");
|
||
|
||
$cacheStored = ai_sse_cache_response($taskId, $possibleResponse);
|
||
if ($cacheStored) {
|
||
error_log("[AI SSE] Cached alternative response for {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: cached alternative response");
|
||
} else {
|
||
error_log("[AI SSE] Failed to cache alternative response for {$taskId}");
|
||
}
|
||
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $possibleResponse
|
||
]);
|
||
$redis->unsubscribe([$channel]);
|
||
}
|
||
}
|
||
} else {
|
||
error_log("[AI SSE] Event task_id mismatch: expected {$taskId}, got {$eventTaskId}");
|
||
}
|
||
|
||
// Проверяем не отключился ли клиент
|
||
if (connection_aborted()) {
|
||
error_log("[AI SSE] Client disconnected, unsubscribing");
|
||
$redis->unsubscribe([$channel]);
|
||
}
|
||
});
|
||
|
||
} else {
|
||
// Используем Predis через Composer
|
||
require_once '/var/www/fastuser/data/www/crm.clientright.ru/vendor/autoload.php';
|
||
|
||
$redis = new Predis\Client([
|
||
'scheme' => 'tcp',
|
||
'host' => 'crm.clientright.ru',
|
||
'port' => 6379,
|
||
'password' => 'CRM_Redis_Pass_2025_Secure!',
|
||
'database' => 0,
|
||
]);
|
||
|
||
// Отправляем начальное событие
|
||
sendSSE('connected', [
|
||
'message' => 'Подключено к AI событиям через Predis',
|
||
'task_id' => $taskId
|
||
]);
|
||
|
||
// Подписываемся на канал
|
||
$channel = "ai:response:{$taskId}";
|
||
$pubsub = $redis->pubSubLoop();
|
||
$pubsub->subscribe($channel);
|
||
|
||
$lastHeartbeat = time();
|
||
|
||
foreach ($pubsub as $message) {
|
||
// Heartbeat каждые 15 секунд
|
||
if (time() - $lastHeartbeat > 15) {
|
||
sendSSE('heartbeat', ['timestamp' => time()]);
|
||
$lastHeartbeat = time();
|
||
}
|
||
|
||
// Обрабатываем только сообщения
|
||
if ($message->kind === 'message') {
|
||
error_log("[AI SSE] Received message via Predis on channel {$channel}: " . substr($message->payload, 0, 200));
|
||
|
||
$event = json_decode($message->payload, true);
|
||
|
||
// Если это не JSON, значит n8n отправил просто строку ответа
|
||
if (!$event || !is_array($event)) {
|
||
error_log("[AI SSE] Message is not JSON via Predis, treating as plain text response");
|
||
ai_sse_debug_log("{$taskId}: plain text message via Predis");
|
||
|
||
$cacheStored = ai_sse_cache_response($taskId, $message->payload);
|
||
if ($cacheStored) {
|
||
error_log("[AI SSE] Cached plain text response via Predis for {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: cached plain text via Predis");
|
||
} else {
|
||
error_log("[AI SSE] Failed to cache plain text response via Predis for {$taskId}");
|
||
}
|
||
|
||
// Если это просто строка - отправляем как ответ
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $message->payload // Используем сообщение как есть
|
||
]);
|
||
$pubsub->unsubscribe($channel);
|
||
break;
|
||
}
|
||
|
||
error_log("[AI SSE] Decoded event via Predis: " . json_encode($event, JSON_UNESCAPED_UNICODE));
|
||
|
||
// Проверяем что это сообщение для нашего task_id
|
||
$eventTaskId = $event['task_id'] ?? $event['taskId'] ?? null;
|
||
$isOurTask = ($eventTaskId === $taskId) || ($eventTaskId === null && $channel === "ai:response:{$taskId}");
|
||
|
||
if ($isOurTask) {
|
||
if (isset($event['response']) && !empty($event['response'])) {
|
||
error_log("[AI SSE] Sending response event via Predis for task {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: JSON response via Predis");
|
||
|
||
$cacheStored = ai_sse_cache_response($taskId, $event['response'], $event['status'] ?? 'completed');
|
||
if ($cacheStored) {
|
||
error_log("[AI SSE] Cached JSON response via Predis for {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: cached JSON via Predis");
|
||
} else {
|
||
error_log("[AI SSE] Failed to cache JSON response via Predis for {$taskId}");
|
||
}
|
||
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $event['response']
|
||
]);
|
||
// Отписываемся после получения ответа
|
||
$pubsub->unsubscribe($channel);
|
||
break;
|
||
} elseif (isset($event['error']) && !empty($event['error'])) {
|
||
error_log("[AI SSE] Sending error event via Predis for task {$taskId}");
|
||
sendSSE('error', [
|
||
'task_id' => $taskId,
|
||
'error' => $event['error']
|
||
]);
|
||
// Отписываемся после получения ошибки
|
||
$pubsub->unsubscribe($channel);
|
||
break;
|
||
} else {
|
||
error_log("[AI SSE] Event received via Predis but no response/error field. Event keys: " . implode(', ', array_keys($event)));
|
||
// Если есть другие поля, попробуем найти ответ
|
||
$possibleResponse = $event['message'] ?? $event['text'] ?? $event['content'] ?? null;
|
||
if ($possibleResponse) {
|
||
error_log("[AI SSE] Found response in alternative field via Predis");
|
||
ai_sse_debug_log("{$taskId}: alternative response via Predis");
|
||
|
||
$cacheStored = ai_sse_cache_response($taskId, $possibleResponse);
|
||
if ($cacheStored) {
|
||
error_log("[AI SSE] Cached alternative response via Predis for {$taskId}");
|
||
ai_sse_debug_log("{$taskId}: cached alt response via Predis");
|
||
} else {
|
||
error_log("[AI SSE] Failed to cache alternative response via Predis for {$taskId}");
|
||
}
|
||
|
||
sendSSE('response', [
|
||
'task_id' => $taskId,
|
||
'response' => $possibleResponse
|
||
]);
|
||
$pubsub->unsubscribe($channel);
|
||
break;
|
||
}
|
||
}
|
||
} else {
|
||
error_log("[AI SSE] Event task_id mismatch via Predis: expected {$taskId}, got {$eventTaskId}");
|
||
}
|
||
}
|
||
|
||
// Проверяем не отключился ли клиент
|
||
if (connection_aborted()) {
|
||
$pubsub->unsubscribe($channel);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
} catch (Exception $e) {
|
||
error_log("[AI SSE] Error: " . $e->getMessage());
|
||
sendSSE('error', ['message' => 'Redis error: ' . $e->getMessage()]);
|
||
}
|
||
?>
|
||
|