Files
aiform_prod/test_redis_publish_direct.py

75 lines
2.2 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Тест прямой публикации в Redis (имитация n8n Redis ноды)
"""
import redis
import json
from datetime import datetime
print("=" * 60)
print("🧪 ТЕСТ ПРЯМОЙ ПУБЛИКАЦИИ В REDIS")
print("=" * 60)
# Подключение к Redis
r = redis.Redis(
host='crm.clientright.ru',
port=6379,
password='CRM_Redis_Pass_2025_Secure!',
decode_responses=True
)
# Проверка подключения
try:
r.ping()
print("✅ Redis подключен!")
except Exception as e:
print(f"❌ Ошибка подключения: {e}")
exit(1)
# Тестовые данные
claim_id = "CLM-TEST-DIRECT-123"
channel = f"ocr_events:{claim_id}"
event_data = {
"event_type": "ocr_completed",
"status": "success",
"message": "✅ Тест прямой публикации из Python (имитация n8n)",
"data": {
"file_id": "test-file-123",
"is_valid_document": True,
"test_mode": True,
"source": "direct_redis_publish"
},
"timestamp": datetime.now().isoformat()
}
message = json.dumps(event_data, ensure_ascii=False)
print(f"\n📺 Канал: {channel}")
print(f"📦 Сообщение:")
print(json.dumps(event_data, indent=2, ensure_ascii=False))
print("\n" + "=" * 60)
# Публикация
try:
num_subscribers = r.publish(channel, message)
print(f"\n✅ Сообщение опубликовано!")
print(f"👥 Количество подписчиков: {num_subscribers}")
if num_subscribers == 0:
print("\n⚠️ ВНИМАНИЕ: Нет активных подписчиков!")
print(" Это нормально, если никто не слушает канал.")
print(" Запусти monitor_redis_direct.py в другом терминале.")
else:
print(f"\n🎉 {num_subscribers} подписчик(ов) получили сообщение!")
except Exception as e:
print(f"\n❌ Ошибка публикации: {e}")
finally:
r.close()
print("\n" + "=" * 60)
print(f"⏰ Завершено: {datetime.now().strftime('%H:%M:%S')}")