#!/usr/bin/env python3
"""
Приоритетная обработка Санкт-Петербурга:
1. Создание chunks из hotel_website_processed
2. Генерация эмбеддингов через BGE-M3 API
3. Сохранение в hotel_website_chunks с metadata
"""
import psycopg2
from urllib.parse import unquote
import requests
import json
import time
import logging
from typing import List, Dict, Tuple
import uuid
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('embeddings_spb.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Конфигурация
BGE_API_URL = "http://147.45.146.17:8002/embed"
BGE_API_KEY = "22564b177aa73b6ac0b8642d7773350ff4c01d4983f028beff15ea247f09fa89"
CHUNK_SIZE = 600 # Возвращаем обратно
CHUNK_OVERLAP = 100
BATCH_SIZE = 8
MAX_RETRIES = 3
class EmbeddingProcessor:
def __init__(self):
self.conn = None
self.cur = None
self.connect_db()
def connect_db(self):
"""Подключение к базе данных"""
try:
self.conn = psycopg2.connect(
host='147.45.189.234',
port=5432,
database='default_db',
user='gen_user',
password=unquote('2~~9_%5EkVsU%3F2%5CS')
)
self.conn.autocommit = True
self.cur = self.conn.cursor()
logger.info("✅ Подключение к БД установлено")
except Exception as e:
logger.error(f"❌ Ошибка подключения к БД: {e}")
raise
def get_hotel_info(self, hotel_id: str) -> Dict:
"""Получение информации об отеле из hotel_main"""
try:
self.cur.execute("""
SELECT id, full_name, region_name
FROM hotel_main
WHERE id = %s;
""", (hotel_id,))
result = self.cur.fetchone()
if result:
return {
'hotel_id': result[0],
'hotel_name': result[1],
'region_name': result[2]
}
return None
except Exception as e:
logger.error(f"❌ Ошибка получения информации об отеле {hotel_id}: {e}")
return None
def create_chunks_from_text(self, text: str, hotel_id: str, url: str, raw_page_id: int) -> List[Dict]:
"""Создание chunks из текста"""
if not text or len(text.strip()) < 50:
return []
chunks = []
start = 0
while start < len(text):
end = start + CHUNK_SIZE
chunk_text = text[start:end]
if end < len(text):
last_period = chunk_text.rfind('.')
last_newline = chunk_text.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > start + CHUNK_SIZE // 2:
chunk_text = text[start:start + break_point + 1]
end = start + break_point + 1
chunks.append({
'text': chunk_text.strip(),
'metadata': {
'hotel_id': str(hotel_id),
'url': url,
'page_id': raw_page_id,
'chunk_index': len(chunks),
'chunk_size': len(chunk_text.strip())
}
})
start = end - CHUNK_OVERLAP if end < len(text) else end
return chunks
def get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Получение embeddings через BGE-M3 API (по одному тексту за раз)"""
embeddings = []
for text in texts:
for attempt in range(MAX_RETRIES):
try:
response = requests.post(
BGE_API_URL,
headers={
"Authorization": f"Bearer {BGE_API_KEY}",
"Content-Type": "application/json"
},
json={"text": text}, # Единственное число!
timeout=30
)
if response.status_code == 200:
data = response.json()
# API возвращает {"embeddings": [[...]]} - берём первый элемент
emb = data.get('embeddings', [[]])[0]
if emb:
embeddings.append(emb)
break
else:
logger.warning(f"⚠️ API вернул код {response.status_code}, попытка {attempt + 1}/{MAX_RETRIES}")
time.sleep(1)
except Exception as e:
logger.error(f"❌ Ошибка API (попытка {attempt + 1}/{MAX_RETRIES}): {e}")
time.sleep(1)
time.sleep(0.1) # Небольшая пауза между запросами
return embeddings
def save_chunks(self, chunks: List[Dict], embeddings: List[List[float]]):
"""Сохранение chunks с embeddings в БД"""
if len(chunks) != len(embeddings):
logger.error(f"❌ Несоответствие: {len(chunks)} chunks != {len(embeddings)} embeddings")
return
try:
for chunk, embedding in zip(chunks, embeddings):
chunk_id = str(uuid.uuid4())
self.cur.execute("""
INSERT INTO hotel_website_chunks (id, text, metadata, embedding)
VALUES (%s, %s, %s, %s)
""", (
chunk_id,
chunk['text'],
json.dumps(chunk['metadata']),
embedding
))
except Exception as e:
logger.error(f"❌ Ошибка сохранения chunks: {e}")
raise
def get_spb_hotels_to_process(self) -> List[Tuple]:
"""Получение списка отелей Питера для обработки"""
try:
self.cur.execute("""
SELECT DISTINCT
wr.hotel_id,
hm.full_name
FROM hotel_website_raw wr
LEFT JOIN hotel_website_processed wp ON wr.id = wp.raw_page_id
JOIN hotel_main hm ON wr.hotel_id = hm.id
WHERE wp.id IS NULL
AND hm.region_name = 'г. Санкт-Петербург'
ORDER BY hm.full_name
""")
return self.cur.fetchall()
except Exception as e:
logger.error(f"❌ Ошибка получения списка отелей: {e}")
return []
def process_hotel(self, hotel_id: str) -> Tuple[int, bool]:
"""Обработка одного отеля"""
start_time = time.time()
hotel_info = self.get_hotel_info(hotel_id)
if not hotel_info:
logger.error(f"❌ Не найдена информация об отеле {hotel_id}")
return 0, False
logger.info(f"🏨 Обрабатываем отель: {hotel_info['hotel_name'][:50]}...")
# Получаем необработанные страницы
self.cur.execute("""
SELECT wr.id, wr.url, wr.html, wr.hotel_id
FROM hotel_website_raw wr
LEFT JOIN hotel_website_processed wp ON wr.id = wp.raw_page_id
WHERE wp.id IS NULL
AND wr.hotel_id = %s
ORDER BY wr.id
""", (hotel_id,))
pages = self.cur.fetchall()
logger.info(f" 📄 Найдено {len(pages)} страниц")
total_chunks_saved = 0
for page_id, url, html, hotel_id in pages:
# Упрощенная очистка HTML
from html import unescape
import re
text = re.sub(r'', '', html, flags=re.DOTALL)
text = re.sub(r'', '', text, flags=re.DOTALL)
text = re.sub(r'<[^>]+>', ' ', text)
text = unescape(text)
text = re.sub(r'\s+', ' ', text).strip()
# Создаем chunks
chunks = self.create_chunks_from_text(text, hotel_id, url, page_id)
if not chunks:
continue
# Обрабатываем батчами
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i:i + BATCH_SIZE]
texts = [chunk['text'] for chunk in batch]
logger.info(f" 🔄 Обрабатываем батч {i//BATCH_SIZE + 1}: {len(batch)} chunks")
embeddings = self.get_embeddings(texts)
if not embeddings:
logger.error(f" ❌ Не удалось получить embeddings для батча")
continue
self.save_chunks(batch, embeddings)
logger.info(f" ✅ Батч успешно обработан")
# Отмечаем страницу как обработанную
self.cur.execute("""
INSERT INTO hotel_website_processed
(raw_page_id, hotel_id, url, cleaned_text, text_length, processed_at)
VALUES (%s, %s, %s, %s, %s, NOW())
""", (page_id, hotel_id, url, text[:1000], len(text)))
total_chunks_saved += len(chunks)
logger.info(f"✅ Сохранено {len(chunks)} chunks для отеля {hotel_info['hotel_name'][:50]}...")
logger.info(f" ✅ Страница {page_id}: {len(chunks)} chunks")
elapsed = time.time() - start_time
logger.info(f"🎉 Отель {hotel_info['hotel_name'][:50]}... обработан: {total_chunks_saved} chunks")
logger.info(f"✅ Успешно за {elapsed:.2f} сек")
return total_chunks_saved, True
def run(self):
"""Основной цикл обработки"""
logger.info("🚀 Запуск обработки САНКТ-ПЕТЕРБУРГА")
hotels = self.get_spb_hotels_to_process()
total_hotels = len(hotels)
if not hotels:
logger.info("✅ Все отели Питера уже обработаны!")
return
logger.info(f"📊 Найдено отелей к обработке: {total_hotels}")
processed = 0
total_chunks = 0
for idx, (hotel_id, hotel_name) in enumerate(hotels, 1):
logger.info(f"\n🔄 Обрабатываем отель {idx}/{total_hotels}: {hotel_id}")
try:
chunks_saved, success = self.process_hotel(hotel_id)
if success:
processed += 1
total_chunks += chunks_saved
except Exception as e:
logger.error(f"❌ Ошибка обработки отеля {hotel_id}: {e}")
continue
logger.info("\n" + "="*80)
logger.info("🎉 ПИТЕР ОБРАБОТАН!")
logger.info("="*80)
logger.info(f"✅ Обработано отелей: {processed}/{total_hotels}")
logger.info(f"📦 Создано chunks: {total_chunks:,}")
logger.info("="*80)
def close(self):
"""Закрытие соединений"""
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
if __name__ == "__main__":
processor = EmbeddingProcessor()
try:
processor.run()
except KeyboardInterrupt:
logger.info("\n⚠️ Прервано пользователем")
except Exception as e:
logger.error(f"❌ Критическая ошибка: {e}")
finally:
processor.close()
logger.info("👋 Завершение работы")