Files
aiform_dev/test_redis_publish_direct.py
AI Assistant 9084d75103 feat: Пошаговая загрузка документов с модалкой на Step 2
🎯 Изменения:
- Документы загружаются по очереди (один за другим)
- После загрузки каждого документа открывается модалка с крутилкой
- SSE слушает конкретный event_type: {file_type}_processed
- Модалка показывает результат распознавания с извлечёнными данными
- Кнопка 'Продолжить' → переход к следующему документу
- Опциональные документы можно пропустить
- После обработки всех обязательных → 'Далее на Step 3'

📊 UX флоу:
1. Выбор типа события → показываются нужные документы
2. Документ 1: Выбрать файл → Загрузить → Модалка → Результат → Продолжить
3. Документ 2: Выбрать файл → Загрузить → Модалка → Результат → Продолжить
4. Документ 3 (опц): Загрузить ИЛИ Пропустить
5. Все обязательные обработаны → Далее на Step 3

🔑 Каждый документ получает свой уникальный event_type:
- frontend отправляет file_type
- n8n возвращает event_type = {file_type}_processed
- frontend слушает этот конкретный event_type через SSE
2025-10-28 12:43:38 +03:00

76 lines
2.2 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Тест прямой публикации в Redis (имитация n8n Redis ноды)
"""
import redis
import json
from datetime import datetime
print("=" * 60)
print("🧪 ТЕСТ ПРЯМОЙ ПУБЛИКАЦИИ В REDIS")
print("=" * 60)
# Подключение к Redis
r = redis.Redis(
host='crm.clientright.ru',
port=6379,
password='CRM_Redis_Pass_2025_Secure!',
decode_responses=True
)
# Проверка подключения
try:
r.ping()
print("✅ Redis подключен!")
except Exception as e:
print(f"❌ Ошибка подключения: {e}")
exit(1)
# Тестовые данные
claim_id = "CLM-TEST-DIRECT-123"
channel = f"ocr_events:{claim_id}"
event_data = {
"event_type": "ocr_completed",
"status": "success",
"message": "✅ Тест прямой публикации из Python (имитация n8n)",
"data": {
"file_id": "test-file-123",
"is_valid_document": True,
"test_mode": True,
"source": "direct_redis_publish"
},
"timestamp": datetime.now().isoformat()
}
message = json.dumps(event_data, ensure_ascii=False)
print(f"\n📺 Канал: {channel}")
print(f"📦 Сообщение:")
print(json.dumps(event_data, indent=2, ensure_ascii=False))
print("\n" + "=" * 60)
# Публикация
try:
num_subscribers = r.publish(channel, message)
print(f"\n✅ Сообщение опубликовано!")
print(f"👥 Количество подписчиков: {num_subscribers}")
if num_subscribers == 0:
print("\n⚠️ ВНИМАНИЕ: Нет активных подписчиков!")
print(" Это нормально, если никто не слушает канал.")
print(" Запусти monitor_redis_direct.py в другом терминале.")
else:
print(f"\n🎉 {num_subscribers} подписчик(ов) получили сообщение!")
except Exception as e:
print(f"\n❌ Ошибка публикации: {e}")
finally:
r.close()
print("\n" + "=" * 60)
print(f"⏰ Завершено: {datetime.now().strftime('%H:%M:%S')}")