Files
hotels/process_spb_embeddings_correct.py

357 lines
15 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Обработка chunks и embeddings только для Санкт-Петербурга
ИСПРАВЛЕННАЯ ВЕРСИЯ: берет данные из hotel_website_processed
"""
import psycopg2
from urllib.parse import unquote
import requests
import json
import time
import logging
from typing import List, Dict
import uuid
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('spb_embeddings_correct.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Конфигурация
BGE_API_URL = "http://147.45.146.17:8002/embed"
BGE_API_KEY = "22564b177aa73b6ac0b8642d7773350ff4c01d4983f028beff15ea247f09fa89"
CHUNK_SIZE = 600
CHUNK_OVERLAP = 100
BATCH_SIZE = 8
MAX_RETRIES = 3
class EmbeddingProcessor:
def __init__(self):
self.conn = None
self.cur = None
self.connect_db()
def connect_db(self):
"""Подключение к базе данных"""
try:
self.conn = psycopg2.connect(
host='147.45.189.234',
port=5432,
database='default_db',
user='gen_user',
password=unquote('2~~9_%5EkVsU%3F2%5CS')
)
self.conn.autocommit = False # Используем транзакции
self.cur = self.conn.cursor()
logger.info("✅ Подключение к БД установлено")
except Exception as e:
logger.error(f"❌ Ошибка подключения к БД: {e}")
raise
def get_hotel_info(self, hotel_id: str) -> Dict:
"""Получение информации об отеле из hotel_main"""
try:
self.cur.execute("""
SELECT id, full_name, region_name
FROM hotel_main
WHERE id = %s;
""", (hotel_id,))
result = self.cur.fetchone()
if result:
return {
'hotel_id': result[0],
'hotel_name': result[1],
'region_name': result[2]
}
return None
except Exception as e:
logger.error(f"❌ Ошибка получения информации об отеле {hotel_id}: {e}")
return None
def create_chunks_from_text(self, text: str, hotel_id: str, url: str, raw_page_id: int) -> List[Dict]:
"""Создание chunks из текста"""
if not text or len(text.strip()) < 50:
return []
chunks = []
start = 0
while start < len(text):
end = start + CHUNK_SIZE
chunk_text = text[start:end]
if end < len(text):
# Ищем хорошую точку разрыва (конец предложения)
last_period = chunk_text.rfind('.')
last_newline = chunk_text.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > start + CHUNK_SIZE // 2:
chunk_text = text[start:start + break_point + 1]
end = start + break_point + 1
# Создаём metadata для chunk
chunk_metadata = {
'hotel_id': hotel_id,
'url': url,
'raw_page_id': raw_page_id,
'chunk_start': start,
'chunk_end': end,
'chunk_length': len(chunk_text)
}
chunks.append({
'id': str(uuid.uuid4()),
'text': chunk_text.strip(),
'metadata': chunk_metadata
})
# Следующий chunk с перекрытием
start = end - CHUNK_OVERLAP
if start >= len(text):
break
return chunks
def generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
"""Генерация эмбеддингов батчем через API с retry логикой"""
for attempt in range(MAX_RETRIES):
try:
headers = {
"X-API-Key": BGE_API_KEY,
"Content-Type": "application/json"
}
payload = {
"text": texts
}
timeout = 120 if len(texts) > 20 else 60
response = requests.post(BGE_API_URL, json=payload, headers=headers, timeout=timeout)
if response.status_code == 200:
result = response.json()
embeddings = result.get('embeddings', [])
if len(embeddings) == len(texts):
return embeddings
else:
logger.warning(f"⚠️ Неполный ответ API: {len(embeddings)}/{len(texts)} эмбеддингов")
if attempt < MAX_RETRIES - 1:
time.sleep(5)
continue
else:
logger.error(f"❌ Ошибка API: {response.status_code} - {response.text}")
if attempt < MAX_RETRIES - 1:
time.sleep(10)
continue
except requests.exceptions.Timeout:
logger.warning(f"⚠️ Таймаут API (попытка {attempt + 1}/{MAX_RETRIES})")
if attempt < MAX_RETRIES - 1:
time.sleep(10)
continue
except Exception as e:
logger.error(f"❌ Ошибка генерации эмбеддингов (попытка {attempt + 1}/{MAX_RETRIES}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(5)
continue
logger.error(f"Не удалось получить эмбеддинги после {MAX_RETRIES} попыток")
return []
def save_chunks_to_db(self, chunks: List[Dict], hotel_info: Dict):
"""Сохранение chunks в базу данных"""
try:
# Разбиваем chunks на батчи для API
all_embeddings = []
for i in range(0, len(chunks), BATCH_SIZE):
batch_chunks = chunks[i:i + BATCH_SIZE]
batch_texts = [chunk['text'] for chunk in batch_chunks]
logger.info(f" 🔄 Обрабатываем батч {i//BATCH_SIZE + 1}: {len(batch_texts)} chunks")
# Генерируем эмбеддинги для батча
batch_embeddings = self.generate_embeddings_batch(batch_texts)
if len(batch_embeddings) == len(batch_texts):
all_embeddings.extend(batch_embeddings)
logger.info(f" ✅ Батч успешно обработан")
else:
logger.error(f" ❌ Ошибка в батче: {len(batch_embeddings)}/{len(batch_texts)} эмбеддингов")
return False
# Небольшая пауза между батчами
if i + BATCH_SIZE < len(chunks):
time.sleep(1)
if len(all_embeddings) != len(chunks):
logger.error(f"❌ Количество эмбеддингов ({len(all_embeddings)}) не совпадает с количеством chunks ({len(chunks)})")
return False
# Обновляем metadata с информацией об отеле и сохраняем в БД
for i, chunk in enumerate(chunks):
chunk['metadata']['hotel_name'] = hotel_info['hotel_name']
chunk['metadata']['region_name'] = hotel_info['region_name']
# Сохраняем в БД
embedding_str = json.dumps(all_embeddings[i])
self.cur.execute("""
INSERT INTO hotel_website_chunks (id, text, metadata, embedding)
VALUES (%s, %s, %s, %s::vector)
ON CONFLICT (id) DO UPDATE SET
text = EXCLUDED.text,
metadata = EXCLUDED.metadata,
embedding = EXCLUDED.embedding;
""", (
chunk['id'],
chunk['text'],
json.dumps(chunk['metadata']),
embedding_str
))
self.conn.commit()
logger.info(f"✅ Сохранено {len(chunks)} chunks для отеля {hotel_info['hotel_name'][:50]}...")
return True
except Exception as e:
logger.error(f"❌ Ошибка сохранения chunks: {e}")
self.conn.rollback()
return False
def process_hotel(self, hotel_id: str) -> bool:
"""Обработка одного отеля - БЕРЕТ ДАННЫЕ ИЗ hotel_website_processed"""
try:
# Получаем информацию об отеле
hotel_info = self.get_hotel_info(hotel_id)
if not hotel_info:
logger.warning(f"⚠️ Отель {hotel_id} не найден в hotel_main")
return False
# ВАЖНО: Получаем страницы отеля из hotel_website_processed
self.cur.execute("""
SELECT id, url, cleaned_text
FROM hotel_website_processed
WHERE hotel_id = %s
AND cleaned_text IS NOT NULL
AND LENGTH(cleaned_text) > 50
ORDER BY id;
""", (hotel_id,))
pages = self.cur.fetchall()
logger.info(f"🏨 Обрабатываем отель: {hotel_info['hotel_name'][:50]}...")
logger.info(f" 📄 Найдено {len(pages)} страниц в hotel_website_processed")
if not pages:
logger.warning(f"⚠️ Нет обработанных страниц для отеля {hotel_id}")
return False
total_chunks = 0
for page_id, url, text in pages:
# Создаём chunks из ОЧИЩЕННОГО текста
chunks = self.create_chunks_from_text(text, str(hotel_id), url, page_id)
if chunks:
# Сохраняем chunks
if self.save_chunks_to_db(chunks, hotel_info):
total_chunks += len(chunks)
logger.info(f" ✅ Страница {page_id}: {len(chunks)} chunks")
else:
logger.error(f" ❌ Ошибка сохранения chunks для страницы {page_id}")
logger.info(f"🎉 Отель {hotel_info['hotel_name'][:50]}... обработан: {total_chunks} chunks")
return True
except Exception as e:
logger.error(f"❌ Ошибка обработки отеля {hotel_id}: {e}")
self.conn.rollback()
return False
def process_spb_region(self):
"""Обработка всех отелей Санкт-Петербурга из hotel_website_processed"""
try:
# ВАЖНО: Получаем отели СПБ из hotel_website_processed, у которых нет chunks
self.cur.execute("""
SELECT DISTINCT p.hotel_id, h.full_name
FROM hotel_website_processed p
INNER JOIN hotel_main h ON p.hotel_id = h.id
LEFT JOIN hotel_website_chunks c ON p.hotel_id::text = c.metadata->>'hotel_id'
WHERE h.region_name = 'г. Санкт-Петербург'
AND p.cleaned_text IS NOT NULL
AND LENGTH(p.cleaned_text) > 50
AND c.id IS NULL
ORDER BY h.full_name
""")
hotels = self.cur.fetchall()
logger.info(f"📊 Найдено {len(hotels)} отелей СПБ для обработки из hotel_website_processed")
if not hotels:
logger.info("Все отели СПБ уже обработаны!")
return
successful = 0
failed = 0
for i, (hotel_id, hotel_name) in enumerate(hotels, 1):
logger.info(f"\n🔄 Обрабатываем отель {i}/{len(hotels)}: {hotel_name}")
start_time = time.time()
if self.process_hotel(str(hotel_id)):
successful += 1
processing_time = time.time() - start_time
logger.info(f"✅ Успешно за {processing_time:.2f} сек")
else:
failed += 1
logger.error(f"❌ Ошибка обработки")
# Показываем прогресс каждые 10 отелей
if i % 10 == 0:
logger.info(f"\n📈 Прогресс: {i}/{len(hotels)} отелей")
logger.info(f" ✅ Успешно: {successful}")
logger.info(f" ❌ Ошибок: {failed}")
# Финальная статистика
logger.info(f"\n🎉 ОБРАБОТКА СПБ ЗАВЕРШЕНА!")
logger.info(f" ✅ Успешно: {successful}")
logger.info(f" ❌ Ошибок: {failed}")
except Exception as e:
logger.error(f"❌ Ошибка обработки региона: {e}")
def close(self):
"""Закрытие соединения с БД"""
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
def main():
logger.info("🚀 Запуск обработки Санкт-Петербурга (из hotel_website_processed)")
processor = EmbeddingProcessor()
try:
processor.process_spb_region()
logger.info("✅ Обработка завершена!")
finally:
processor.close()
if __name__ == "__main__":
main()