262 lines
11 KiB
Python
262 lines
11 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
Обработка chunks и embeddings только для Санкт-Петербурга
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import psycopg2
|
|||
|
|
from urllib.parse import unquote
|
|||
|
|
import requests
|
|||
|
|
import json
|
|||
|
|
import time
|
|||
|
|
import logging
|
|||
|
|
from typing import List, Dict, Tuple
|
|||
|
|
import uuid
|
|||
|
|
|
|||
|
|
# Настройка логирования
|
|||
|
|
logging.basicConfig(
|
|||
|
|
level=logging.INFO,
|
|||
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|||
|
|
handlers=[
|
|||
|
|
logging.FileHandler('spb_embeddings.log'),
|
|||
|
|
logging.StreamHandler()
|
|||
|
|
]
|
|||
|
|
)
|
|||
|
|
logger = logging.getLogger(__name__)
|
|||
|
|
|
|||
|
|
# Конфигурация
|
|||
|
|
BGE_API_URL = "http://147.45.146.17:8002/embed"
|
|||
|
|
BGE_API_KEY = "22564b177aa73b6ac0b8642d7773350ff4c01d4983f028beff15ea247f09fa89"
|
|||
|
|
CHUNK_SIZE = 600
|
|||
|
|
CHUNK_OVERLAP = 100
|
|||
|
|
BATCH_SIZE = 8
|
|||
|
|
MAX_RETRIES = 3
|
|||
|
|
|
|||
|
|
class EmbeddingProcessor:
|
|||
|
|
def __init__(self):
|
|||
|
|
self.conn = None
|
|||
|
|
self.cur = None
|
|||
|
|
self.connect_db()
|
|||
|
|
|
|||
|
|
def connect_db(self):
|
|||
|
|
"""Подключение к базе данных"""
|
|||
|
|
try:
|
|||
|
|
self.conn = psycopg2.connect(
|
|||
|
|
host='147.45.189.234',
|
|||
|
|
port=5432,
|
|||
|
|
database='default_db',
|
|||
|
|
user='gen_user',
|
|||
|
|
password=unquote('2~~9_%5EkVsU%3F2%5CS')
|
|||
|
|
)
|
|||
|
|
self.cur = self.conn.cursor()
|
|||
|
|
logger.info("✅ Подключение к БД установлено")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ Ошибка подключения к БД: {e}")
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
def create_chunks(self, text: str) -> List[str]:
|
|||
|
|
"""Создание chunks из текста"""
|
|||
|
|
if not text or len(text.strip()) < 50:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
chunks = []
|
|||
|
|
start = 0
|
|||
|
|
|
|||
|
|
while start < len(text):
|
|||
|
|
end = start + CHUNK_SIZE
|
|||
|
|
|
|||
|
|
if end >= len(text):
|
|||
|
|
chunks.append(text[start:].strip())
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# Ищем ближайший пробел или перенос строки
|
|||
|
|
while end > start and text[end] not in [' ', '\n', '\t']:
|
|||
|
|
end -= 1
|
|||
|
|
|
|||
|
|
if end == start: # Если не нашли пробел, берем по символам
|
|||
|
|
end = start + CHUNK_SIZE
|
|||
|
|
|
|||
|
|
chunk = text[start:end].strip()
|
|||
|
|
if chunk:
|
|||
|
|
chunks.append(chunk)
|
|||
|
|
|
|||
|
|
start = end - CHUNK_OVERLAP
|
|||
|
|
|
|||
|
|
return chunks
|
|||
|
|
|
|||
|
|
def get_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
|
|||
|
|
"""Получение эмбеддингов для батча текстов"""
|
|||
|
|
for attempt in range(MAX_RETRIES):
|
|||
|
|
try:
|
|||
|
|
response = requests.post(
|
|||
|
|
BGE_API_URL,
|
|||
|
|
headers={
|
|||
|
|
'X-API-Key': BGE_API_KEY,
|
|||
|
|
'Content-Type': 'application/json'
|
|||
|
|
},
|
|||
|
|
json={'text': texts},
|
|||
|
|
timeout=30
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
return result.get('embeddings', [])
|
|||
|
|
else:
|
|||
|
|
logger.warning(f"⚠️ API вернул статус {response.status_code}: {response.text}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.warning(f"⚠️ Попытка {attempt + 1} неудачна: {e}")
|
|||
|
|
if attempt < MAX_RETRIES - 1:
|
|||
|
|
time.sleep(2 ** attempt) # Экспоненциальная задержка
|
|||
|
|
|
|||
|
|
logger.error(f"❌ Не удалось получить эмбеддинги для батча из {len(texts)} текстов")
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def process_hotel(self, hotel_id: str) -> bool:
|
|||
|
|
"""Обработка одного отеля - по странице за раз"""
|
|||
|
|
try:
|
|||
|
|
# Удаляем старые chunks сразу
|
|||
|
|
self.cur.execute("DELETE FROM hotel_website_chunks WHERE metadata->>'hotel_id' = %s", (hotel_id,))
|
|||
|
|
|
|||
|
|
# Получаем только ID страниц
|
|||
|
|
self.cur.execute("""
|
|||
|
|
SELECT id FROM hotel_website_raw
|
|||
|
|
WHERE hotel_id = %s
|
|||
|
|
AND html IS NOT NULL
|
|||
|
|
ORDER BY id
|
|||
|
|
""", (hotel_id,))
|
|||
|
|
|
|||
|
|
page_ids = [row[0] for row in self.cur.fetchall()]
|
|||
|
|
if not page_ids:
|
|||
|
|
logger.warning(f"⚠️ Нет HTML для отеля {hotel_id}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
logger.info(f"📄 Найдено {len(page_ids)} страниц для отеля")
|
|||
|
|
|
|||
|
|
import uuid
|
|||
|
|
import re
|
|||
|
|
total_chunks_saved = 0
|
|||
|
|
|
|||
|
|
# Обрабатываем каждую страницу отдельно
|
|||
|
|
for page_idx, page_id in enumerate(page_ids):
|
|||
|
|
logger.info(f" 📄 Обработка страницы {page_idx + 1}/{len(page_ids)}")
|
|||
|
|
|
|||
|
|
# Загружаем только ОДНУ страницу
|
|||
|
|
self.cur.execute("SELECT html FROM hotel_website_raw WHERE id = %s", (page_id,))
|
|||
|
|
html = self.cur.fetchone()[0]
|
|||
|
|
|
|||
|
|
# Очищаем HTML простой регуляркой (БЕЗ BeautifulSoup - экономия памяти!)
|
|||
|
|
# Удаляем script и style теги
|
|||
|
|
text = re.sub(r'<script[^>]*>.*?</script>', ' ', html, flags=re.DOTALL | re.IGNORECASE)
|
|||
|
|
text = re.sub(r'<style[^>]*>.*?</style>', ' ', text, flags=re.DOTALL | re.IGNORECASE)
|
|||
|
|
# Удаляем все HTML теги
|
|||
|
|
text = re.sub(r'<[^>]+>', ' ', text)
|
|||
|
|
# Декодируем HTML entities
|
|||
|
|
import html as html_module
|
|||
|
|
text = html_module.unescape(text)
|
|||
|
|
# Убираем лишние пробелы
|
|||
|
|
text = re.sub(r'\s+', ' ', text).strip()
|
|||
|
|
|
|||
|
|
# Освобождаем память сразу
|
|||
|
|
del html
|
|||
|
|
|
|||
|
|
# Создаем chunks из этой страницы
|
|||
|
|
page_chunks = self.create_chunks(text)
|
|||
|
|
del text
|
|||
|
|
|
|||
|
|
if not page_chunks:
|
|||
|
|
logger.info(f" ⚠️ Нет chunks на странице {page_idx + 1}")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
logger.info(f" 📄 Создано {len(page_chunks)} chunks")
|
|||
|
|
|
|||
|
|
# Обрабатываем chunks батчами
|
|||
|
|
for i in range(0, len(page_chunks), BATCH_SIZE):
|
|||
|
|
batch = page_chunks[i:i + BATCH_SIZE]
|
|||
|
|
logger.info(f" 🔄 Батч {i//BATCH_SIZE + 1}: {len(batch)} chunks")
|
|||
|
|
embeddings = self.get_embeddings_batch(batch)
|
|||
|
|
if not embeddings:
|
|||
|
|
logger.error(f" ❌ Не удалось получить эмбеддинги")
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Сохраняем сразу
|
|||
|
|
for j, (chunk, embedding) in enumerate(zip(batch, embeddings)):
|
|||
|
|
chunk_id = str(uuid.uuid4())
|
|||
|
|
metadata = {
|
|||
|
|
'hotel_id': str(hotel_id),
|
|||
|
|
'chunk_index': total_chunks_saved,
|
|||
|
|
'page_id': page_id,
|
|||
|
|
'created_at': __import__('time').time()
|
|||
|
|
}
|
|||
|
|
self.cur.execute("""
|
|||
|
|
INSERT INTO hotel_website_chunks (id, text, metadata, embedding)
|
|||
|
|
VALUES (%s, %s, %s, %s::vector)
|
|||
|
|
""", (chunk_id, chunk, __import__('json').dumps(metadata), __import__('json').dumps(embedding)))
|
|||
|
|
total_chunks_saved += 1
|
|||
|
|
|
|||
|
|
# Освобождаем память после каждой страницы
|
|||
|
|
del page_chunks
|
|||
|
|
self.conn.commit()
|
|||
|
|
|
|||
|
|
logger.info(f"✅ Сохранено {total_chunks_saved} chunks для отеля")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ Ошибка обработки отеля {hotel_id}: {e}")
|
|||
|
|
self.conn.rollback()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def process_orel_region(self):
|
|||
|
|
"""Обработка всех отелей Орловской области"""
|
|||
|
|
try:
|
|||
|
|
# Получаем отели Орловской области с HTML но без chunks
|
|||
|
|
self.cur.execute("""
|
|||
|
|
SELECT DISTINCT h.id, h.full_name
|
|||
|
|
FROM hotel_main h
|
|||
|
|
INNER JOIN hotel_website_raw hwr ON h.id = hwr.hotel_id
|
|||
|
|
LEFT JOIN hotel_website_chunks hc ON h.id::text = hc.metadata->>'hotel_id'
|
|||
|
|
WHERE h.region_name = 'г. Санкт-Петербург'
|
|||
|
|
AND hwr.html IS NOT NULL
|
|||
|
|
AND hc.id IS NULL
|
|||
|
|
ORDER BY h.full_name
|
|||
|
|
""")
|
|||
|
|
|
|||
|
|
hotels = self.cur.fetchall()
|
|||
|
|
logger.info(f"📊 Найдено {len(hotels)} отелей для обработки")
|
|||
|
|
|
|||
|
|
if not hotels:
|
|||
|
|
logger.info("✅ Все отели Орловской области уже обработаны!")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
for i, (hotel_id, hotel_name) in enumerate(hotels, 1):
|
|||
|
|
logger.info(f"🔄 Обрабатываем отель {i}/{len(hotels)}: {hotel_name}")
|
|||
|
|
success = self.process_hotel(hotel_id)
|
|||
|
|
if success:
|
|||
|
|
logger.info(f"✅ Отель {hotel_name} обработан успешно")
|
|||
|
|
else:
|
|||
|
|
logger.error(f"❌ Ошибка обработки отеля {hotel_name}")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"❌ Ошибка обработки региона: {e}")
|
|||
|
|
|
|||
|
|
def close(self):
|
|||
|
|
"""Закрытие соединения с БД"""
|
|||
|
|
if self.cur:
|
|||
|
|
self.cur.close()
|
|||
|
|
if self.conn:
|
|||
|
|
self.conn.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
logger.info("🚀 Запуск обработки Орловской области")
|
|||
|
|
|
|||
|
|
processor = EmbeddingProcessor()
|
|||
|
|
try:
|
|||
|
|
processor.process_orel_region()
|
|||
|
|
logger.info("✅ Обработка завершена!")
|
|||
|
|
finally:
|
|||
|
|
processor.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|