Files
hotels/memory_agent.py
Фёдор 0cf3297290 Проект аудита отелей: основные скрипты и документация
- Краулеры: smart_crawler.py, regional_crawler.py
- Аудит: audit_orel_to_excel.py, audit_chukotka_to_excel.py
- РКН проверка: check_rkn_registry.py, recheck_unclear_rkn.py
- Отчёты: create_orel_horizontal_report.py
- Обработка: process_all_hotels_embeddings.py
- Документация: README.md, DB_SCHEMA_REFERENCE.md
2025-10-16 10:52:09 +03:00

204 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Интеграция с MCP сервером памяти агента
"""
import requests
import json
from typing import Dict, List, Optional
import uuid
from datetime import datetime
class MemoryAgent:
"""Клиент для работы с MCP сервером памяти агента"""
def __init__(self, base_url: str = "http://185.197.75.249:9000"):
self.base_url = base_url
self.sse_url = f"{base_url}/sse"
def get_user_id(self, request) -> str:
"""Получить ID пользователя из IP адреса или других данных"""
# Получаем IP адрес
client_ip = request.headers.get('X-Forwarded-For',
request.headers.get('X-Real-IP',
request.client.host))
# Если IP через прокси, берем первый
if ',' in client_ip:
client_ip = client_ip.split(',')[0].strip()
# Создаем стабильный user_id на основе IP
user_id = f"user_{client_ip.replace('.', '_')}"
return user_id
def add_memory(self, user_id: str, content: str, source: str = "chat",
metadata: Optional[Dict] = None) -> Dict:
"""Добавить память в агента через MCP"""
payload = {
"name": f"Chat with {user_id}",
"episode_body": content,
"group_id": user_id, # Используем user_id как group_id
"source": source,
"source_description": f"Chat conversation with user {user_id}",
"metadata": metadata or {}
}
try:
# Используем правильный MCP эндпоинт
response = requests.post(
f"{self.base_url}/mcp_memory_add_memory",
json=payload,
timeout=30
)
if response.status_code == 200:
return {
"status": "success",
"data": response.json()
}
else:
return {
"status": "error",
"error": f"HTTP {response.status_code}: {response.text}"
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def search_memory(self, user_id: str, query: str, max_results: int = 10) -> Dict:
"""Поиск в памяти агента через MCP"""
payload = {
"query": query,
"group_ids": [user_id],
"max_facts": max_results
}
try:
response = requests.post(
f"{self.base_url}/mcp_memory_search_memory_facts",
json=payload,
timeout=30
)
if response.status_code == 200:
return {
"status": "success",
"data": response.json()
}
else:
return {
"status": "error",
"error": f"HTTP {response.status_code}: {response.text}"
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def get_user_history(self, user_id: str, last_n: int = 10) -> Dict:
"""Получить историю пользователя через MCP"""
payload = {
"group_id": user_id,
"last_n": last_n
}
try:
response = requests.post(
f"{self.base_url}/mcp_memory_get_episodes",
json=payload,
timeout=30
)
if response.status_code == 200:
return {
"status": "success",
"data": response.json()
}
else:
return {
"status": "error",
"error": f"HTTP {response.status_code}: {response.text}"
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def test_connection(self) -> Dict:
"""Тест подключения к MCP серверу"""
try:
# Тестируем SSE эндпоинт
response = requests.get(f"{self.sse_url}", timeout=10)
if response.status_code == 200:
return {
"status": "success",
"message": "MCP сервер доступен"
}
else:
return {
"status": "error",
"error": f"HTTP {response.status_code}"
}
except Exception as e:
return {
"status": "error",
"error": f"Не удается подключиться к MCP серверу: {str(e)}"
}
# Глобальный экземпляр
memory_agent = MemoryAgent()
if __name__ == "__main__":
print("=" * 70)
print("🧠 ТЕСТ MCP СЕРВЕРА ПАМЯТИ АГЕНТА")
print("=" * 70)
# Тест подключения
print("🔗 Тестирую подключение...")
result = memory_agent.test_connection()
print(f"Результат: {result}")
if result["status"] == "success":
print("\n✅ MCP сервер доступен!")
# Тест добавления памяти
test_user = "user_192_168_1_100"
test_content = "Пользователь спрашивал про отели в Чукотке. Ответил что там 12 отелей, 4 с сайтами."
print(f"\n📝 Добавляю тестовую память для {test_user}...")
add_result = memory_agent.add_memory(test_user, test_content)
print(f"Результат: {add_result}")
if add_result["status"] == "success":
print("\n✅ Память добавлена!")
# Тест поиска
print(f"\n🔍 Ищу память по запросу 'отели чукотка'...")
search_result = memory_agent.search_memory(test_user, "отели чукотка")
print(f"Результат: {search_result}")
# Тест истории
print(f"\n📚 Получаю историю пользователя...")
history_result = memory_agent.get_user_history(test_user)
print(f"Результат: {history_result}")
else:
print(f"\n❌ Ошибка: {result['error']}")
print("=" * 70)