Files
hotels/process_spb_priority.py
Фёдор 684fada337 🚀 Full project sync: Hotels RAG & Audit System
 Major Features:
- Complete RAG system for hotel website analysis
- Hybrid audit with BGE-M3 embeddings + Natasha NER
- Universal horizontal Excel reports with dashboards
- Multi-region processing (SPb, Orel, Chukotka, Kamchatka)

📊 Completed Regions:
- Орловская область: 100% (36/36)
- Чукотский АО: 100% (4/4)
- г. Санкт-Петербург: 93% (893/960)
- Камчатский край: 87% (89/102)

🔧 Infrastructure:
- PostgreSQL with pgvector extension
- BGE-M3 embeddings API
- Browserless for web scraping
- N8N workflows for automation
- S3/Nextcloud file storage

📝 Documentation:
- Complete DB schemas
- API documentation
- Setup guides
- Status reports
2025-10-27 22:49:42 +03:00

329 lines
13 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Приоритетная обработка Санкт-Петербурга:
1. Создание chunks из hotel_website_processed
2. Генерация эмбеддингов через BGE-M3 API
3. Сохранение в hotel_website_chunks с metadata
"""
import psycopg2
from urllib.parse import unquote
import requests
import json
import time
import logging
from typing import List, Dict, Tuple
import uuid
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('embeddings_spb.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Конфигурация
BGE_API_URL = "http://147.45.146.17:8002/embed"
BGE_API_KEY = "22564b177aa73b6ac0b8642d7773350ff4c01d4983f028beff15ea247f09fa89"
CHUNK_SIZE = 600 # Возвращаем обратно
CHUNK_OVERLAP = 100
BATCH_SIZE = 8
MAX_RETRIES = 3
class EmbeddingProcessor:
def __init__(self):
self.conn = None
self.cur = None
self.connect_db()
def connect_db(self):
"""Подключение к базе данных"""
try:
self.conn = psycopg2.connect(
host='147.45.189.234',
port=5432,
database='default_db',
user='gen_user',
password=unquote('2~~9_%5EkVsU%3F2%5CS')
)
self.conn.autocommit = True
self.cur = self.conn.cursor()
logger.info("✅ Подключение к БД установлено")
except Exception as e:
logger.error(f"❌ Ошибка подключения к БД: {e}")
raise
def get_hotel_info(self, hotel_id: str) -> Dict:
"""Получение информации об отеле из hotel_main"""
try:
self.cur.execute("""
SELECT id, full_name, region_name
FROM hotel_main
WHERE id = %s;
""", (hotel_id,))
result = self.cur.fetchone()
if result:
return {
'hotel_id': result[0],
'hotel_name': result[1],
'region_name': result[2]
}
return None
except Exception as e:
logger.error(f"❌ Ошибка получения информации об отеле {hotel_id}: {e}")
return None
def create_chunks_from_text(self, text: str, hotel_id: str, url: str, raw_page_id: int) -> List[Dict]:
"""Создание chunks из текста"""
if not text or len(text.strip()) < 50:
return []
chunks = []
start = 0
while start < len(text):
end = start + CHUNK_SIZE
chunk_text = text[start:end]
if end < len(text):
last_period = chunk_text.rfind('.')
last_newline = chunk_text.rfind('\n')
break_point = max(last_period, last_newline)
if break_point > start + CHUNK_SIZE // 2:
chunk_text = text[start:start + break_point + 1]
end = start + break_point + 1
chunks.append({
'text': chunk_text.strip(),
'metadata': {
'hotel_id': str(hotel_id),
'url': url,
'page_id': raw_page_id,
'chunk_index': len(chunks),
'chunk_size': len(chunk_text.strip())
}
})
start = end - CHUNK_OVERLAP if end < len(text) else end
return chunks
def get_embeddings(self, texts: List[str]) -> List[List[float]]:
"""Получение embeddings через BGE-M3 API (по одному тексту за раз)"""
embeddings = []
for text in texts:
for attempt in range(MAX_RETRIES):
try:
response = requests.post(
BGE_API_URL,
headers={
"Authorization": f"Bearer {BGE_API_KEY}",
"Content-Type": "application/json"
},
json={"text": text}, # Единственное число!
timeout=30
)
if response.status_code == 200:
data = response.json()
# API возвращает {"embeddings": [[...]]} - берём первый элемент
emb = data.get('embeddings', [[]])[0]
if emb:
embeddings.append(emb)
break
else:
logger.warning(f"⚠️ API вернул код {response.status_code}, попытка {attempt + 1}/{MAX_RETRIES}")
time.sleep(1)
except Exception as e:
logger.error(f"❌ Ошибка API (попытка {attempt + 1}/{MAX_RETRIES}): {e}")
time.sleep(1)
time.sleep(0.1) # Небольшая пауза между запросами
return embeddings
def save_chunks(self, chunks: List[Dict], embeddings: List[List[float]]):
"""Сохранение chunks с embeddings в БД"""
if len(chunks) != len(embeddings):
logger.error(f"❌ Несоответствие: {len(chunks)} chunks != {len(embeddings)} embeddings")
return
try:
for chunk, embedding in zip(chunks, embeddings):
chunk_id = str(uuid.uuid4())
self.cur.execute("""
INSERT INTO hotel_website_chunks (id, text, metadata, embedding)
VALUES (%s, %s, %s, %s)
""", (
chunk_id,
chunk['text'],
json.dumps(chunk['metadata']),
embedding
))
except Exception as e:
logger.error(f"❌ Ошибка сохранения chunks: {e}")
raise
def get_spb_hotels_to_process(self) -> List[Tuple]:
"""Получение списка отелей Питера для обработки"""
try:
self.cur.execute("""
SELECT DISTINCT
wr.hotel_id,
hm.full_name
FROM hotel_website_raw wr
LEFT JOIN hotel_website_processed wp ON wr.id = wp.raw_page_id
JOIN hotel_main hm ON wr.hotel_id = hm.id
WHERE wp.id IS NULL
AND hm.region_name = 'г. Санкт-Петербург'
ORDER BY hm.full_name
""")
return self.cur.fetchall()
except Exception as e:
logger.error(f"❌ Ошибка получения списка отелей: {e}")
return []
def process_hotel(self, hotel_id: str) -> Tuple[int, bool]:
"""Обработка одного отеля"""
start_time = time.time()
hotel_info = self.get_hotel_info(hotel_id)
if not hotel_info:
logger.error(f"Не найдена информация об отеле {hotel_id}")
return 0, False
logger.info(f"🏨 Обрабатываем отель: {hotel_info['hotel_name'][:50]}...")
# Получаем необработанные страницы
self.cur.execute("""
SELECT wr.id, wr.url, wr.html, wr.hotel_id
FROM hotel_website_raw wr
LEFT JOIN hotel_website_processed wp ON wr.id = wp.raw_page_id
WHERE wp.id IS NULL
AND wr.hotel_id = %s
ORDER BY wr.id
""", (hotel_id,))
pages = self.cur.fetchall()
logger.info(f" 📄 Найдено {len(pages)} страниц")
total_chunks_saved = 0
for page_id, url, html, hotel_id in pages:
# Упрощенная очистка HTML
from html import unescape
import re
text = re.sub(r'<script[^>]*>.*?</script>', '', html, flags=re.DOTALL)
text = re.sub(r'<style[^>]*>.*?</style>', '', text, flags=re.DOTALL)
text = re.sub(r'<[^>]+>', ' ', text)
text = unescape(text)
text = re.sub(r'\s+', ' ', text).strip()
# Создаем chunks
chunks = self.create_chunks_from_text(text, hotel_id, url, page_id)
if not chunks:
continue
# Обрабатываем батчами
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i:i + BATCH_SIZE]
texts = [chunk['text'] for chunk in batch]
logger.info(f" 🔄 Обрабатываем батч {i//BATCH_SIZE + 1}: {len(batch)} chunks")
embeddings = self.get_embeddings(texts)
if not embeddings:
logger.error(f"Не удалось получить embeddings для батча")
continue
self.save_chunks(batch, embeddings)
logger.info(f" ✅ Батч успешно обработан")
# Отмечаем страницу как обработанную
self.cur.execute("""
INSERT INTO hotel_website_processed
(raw_page_id, hotel_id, url, cleaned_text, text_length, processed_at)
VALUES (%s, %s, %s, %s, %s, NOW())
""", (page_id, hotel_id, url, text[:1000], len(text)))
total_chunks_saved += len(chunks)
logger.info(f"✅ Сохранено {len(chunks)} chunks для отеля {hotel_info['hotel_name'][:50]}...")
logger.info(f" ✅ Страница {page_id}: {len(chunks)} chunks")
elapsed = time.time() - start_time
logger.info(f"🎉 Отель {hotel_info['hotel_name'][:50]}... обработан: {total_chunks_saved} chunks")
logger.info(f"✅ Успешно за {elapsed:.2f} сек")
return total_chunks_saved, True
def run(self):
"""Основной цикл обработки"""
logger.info("🚀 Запуск обработки САНКТ-ПЕТЕРБУРГА")
hotels = self.get_spb_hotels_to_process()
total_hotels = len(hotels)
if not hotels:
logger.info("Все отели Питера уже обработаны!")
return
logger.info(f"📊 Найдено отелей к обработке: {total_hotels}")
processed = 0
total_chunks = 0
for idx, (hotel_id, hotel_name) in enumerate(hotels, 1):
logger.info(f"\n🔄 Обрабатываем отель {idx}/{total_hotels}: {hotel_id}")
try:
chunks_saved, success = self.process_hotel(hotel_id)
if success:
processed += 1
total_chunks += chunks_saved
except Exception as e:
logger.error(f"❌ Ошибка обработки отеля {hotel_id}: {e}")
continue
logger.info("\n" + "="*80)
logger.info("🎉 ПИТЕР ОБРАБОТАН!")
logger.info("="*80)
logger.info(f"✅ Обработано отелей: {processed}/{total_hotels}")
logger.info(f"📦 Создано chunks: {total_chunks:,}")
logger.info("="*80)
def close(self):
"""Закрытие соединений"""
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
if __name__ == "__main__":
processor = EmbeddingProcessor()
try:
processor.run()
except KeyboardInterrupt:
logger.info("\n⚠️ Прервано пользователем")
except Exception as e:
logger.error(f"❌ Критическая ошибка: {e}")
finally:
processor.close()
logger.info("👋 Завершение работы")