#!/usr/bin/env python3 """ Приоритетная обработка Санкт-Петербурга: 1. Создание chunks из hotel_website_processed 2. Генерация эмбеддингов через BGE-M3 API 3. Сохранение в hotel_website_chunks с metadata """ import psycopg2 from urllib.parse import unquote import requests import json import time import logging from typing import List, Dict, Tuple import uuid # Настройка логирования logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('embeddings_spb.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Конфигурация BGE_API_URL = "http://147.45.146.17:8002/embed" BGE_API_KEY = "22564b177aa73b6ac0b8642d7773350ff4c01d4983f028beff15ea247f09fa89" CHUNK_SIZE = 600 # Возвращаем обратно CHUNK_OVERLAP = 100 BATCH_SIZE = 8 MAX_RETRIES = 3 class EmbeddingProcessor: def __init__(self): self.conn = None self.cur = None self.connect_db() def connect_db(self): """Подключение к базе данных""" try: self.conn = psycopg2.connect( host='147.45.189.234', port=5432, database='default_db', user='gen_user', password=unquote('2~~9_%5EkVsU%3F2%5CS') ) self.conn.autocommit = True self.cur = self.conn.cursor() logger.info("✅ Подключение к БД установлено") except Exception as e: logger.error(f"❌ Ошибка подключения к БД: {e}") raise def get_hotel_info(self, hotel_id: str) -> Dict: """Получение информации об отеле из hotel_main""" try: self.cur.execute(""" SELECT id, full_name, region_name FROM hotel_main WHERE id = %s; """, (hotel_id,)) result = self.cur.fetchone() if result: return { 'hotel_id': result[0], 'hotel_name': result[1], 'region_name': result[2] } return None except Exception as e: logger.error(f"❌ Ошибка получения информации об отеле {hotel_id}: {e}") return None def create_chunks_from_text(self, text: str, hotel_id: str, url: str, raw_page_id: int) -> List[Dict]: """Создание chunks из текста""" if not text or len(text.strip()) < 50: return [] chunks = [] start = 0 while start < len(text): end = start + CHUNK_SIZE chunk_text = text[start:end] if end < len(text): last_period = chunk_text.rfind('.') last_newline = chunk_text.rfind('\n') break_point = max(last_period, last_newline) if break_point > start + CHUNK_SIZE // 2: chunk_text = text[start:start + break_point + 1] end = start + break_point + 1 chunks.append({ 'text': chunk_text.strip(), 'metadata': { 'hotel_id': str(hotel_id), 'url': url, 'page_id': raw_page_id, 'chunk_index': len(chunks), 'chunk_size': len(chunk_text.strip()) } }) start = end - CHUNK_OVERLAP if end < len(text) else end return chunks def get_embeddings(self, texts: List[str]) -> List[List[float]]: """Получение embeddings через BGE-M3 API (по одному тексту за раз)""" embeddings = [] for text in texts: for attempt in range(MAX_RETRIES): try: response = requests.post( BGE_API_URL, headers={ "Authorization": f"Bearer {BGE_API_KEY}", "Content-Type": "application/json" }, json={"text": text}, # Единственное число! timeout=30 ) if response.status_code == 200: data = response.json() # API возвращает {"embeddings": [[...]]} - берём первый элемент emb = data.get('embeddings', [[]])[0] if emb: embeddings.append(emb) break else: logger.warning(f"⚠️ API вернул код {response.status_code}, попытка {attempt + 1}/{MAX_RETRIES}") time.sleep(1) except Exception as e: logger.error(f"❌ Ошибка API (попытка {attempt + 1}/{MAX_RETRIES}): {e}") time.sleep(1) time.sleep(0.1) # Небольшая пауза между запросами return embeddings def save_chunks(self, chunks: List[Dict], embeddings: List[List[float]]): """Сохранение chunks с embeddings в БД""" if len(chunks) != len(embeddings): logger.error(f"❌ Несоответствие: {len(chunks)} chunks != {len(embeddings)} embeddings") return try: for chunk, embedding in zip(chunks, embeddings): chunk_id = str(uuid.uuid4()) self.cur.execute(""" INSERT INTO hotel_website_chunks (id, text, metadata, embedding) VALUES (%s, %s, %s, %s) """, ( chunk_id, chunk['text'], json.dumps(chunk['metadata']), embedding )) except Exception as e: logger.error(f"❌ Ошибка сохранения chunks: {e}") raise def get_spb_hotels_to_process(self) -> List[Tuple]: """Получение списка отелей Питера для обработки""" try: self.cur.execute(""" SELECT DISTINCT wr.hotel_id, hm.full_name FROM hotel_website_raw wr LEFT JOIN hotel_website_processed wp ON wr.id = wp.raw_page_id JOIN hotel_main hm ON wr.hotel_id = hm.id WHERE wp.id IS NULL AND hm.region_name = 'г. Санкт-Петербург' ORDER BY hm.full_name """) return self.cur.fetchall() except Exception as e: logger.error(f"❌ Ошибка получения списка отелей: {e}") return [] def process_hotel(self, hotel_id: str) -> Tuple[int, bool]: """Обработка одного отеля""" start_time = time.time() hotel_info = self.get_hotel_info(hotel_id) if not hotel_info: logger.error(f"❌ Не найдена информация об отеле {hotel_id}") return 0, False logger.info(f"🏨 Обрабатываем отель: {hotel_info['hotel_name'][:50]}...") # Получаем необработанные страницы self.cur.execute(""" SELECT wr.id, wr.url, wr.html, wr.hotel_id FROM hotel_website_raw wr LEFT JOIN hotel_website_processed wp ON wr.id = wp.raw_page_id WHERE wp.id IS NULL AND wr.hotel_id = %s ORDER BY wr.id """, (hotel_id,)) pages = self.cur.fetchall() logger.info(f" 📄 Найдено {len(pages)} страниц") total_chunks_saved = 0 for page_id, url, html, hotel_id in pages: # Упрощенная очистка HTML from html import unescape import re text = re.sub(r']*>.*?', '', html, flags=re.DOTALL) text = re.sub(r']*>.*?', '', text, flags=re.DOTALL) text = re.sub(r'<[^>]+>', ' ', text) text = unescape(text) text = re.sub(r'\s+', ' ', text).strip() # Создаем chunks chunks = self.create_chunks_from_text(text, hotel_id, url, page_id) if not chunks: continue # Обрабатываем батчами for i in range(0, len(chunks), BATCH_SIZE): batch = chunks[i:i + BATCH_SIZE] texts = [chunk['text'] for chunk in batch] logger.info(f" 🔄 Обрабатываем батч {i//BATCH_SIZE + 1}: {len(batch)} chunks") embeddings = self.get_embeddings(texts) if not embeddings: logger.error(f" ❌ Не удалось получить embeddings для батча") continue self.save_chunks(batch, embeddings) logger.info(f" ✅ Батч успешно обработан") # Отмечаем страницу как обработанную self.cur.execute(""" INSERT INTO hotel_website_processed (raw_page_id, hotel_id, url, cleaned_text, text_length, processed_at) VALUES (%s, %s, %s, %s, %s, NOW()) """, (page_id, hotel_id, url, text[:1000], len(text))) total_chunks_saved += len(chunks) logger.info(f"✅ Сохранено {len(chunks)} chunks для отеля {hotel_info['hotel_name'][:50]}...") logger.info(f" ✅ Страница {page_id}: {len(chunks)} chunks") elapsed = time.time() - start_time logger.info(f"🎉 Отель {hotel_info['hotel_name'][:50]}... обработан: {total_chunks_saved} chunks") logger.info(f"✅ Успешно за {elapsed:.2f} сек") return total_chunks_saved, True def run(self): """Основной цикл обработки""" logger.info("🚀 Запуск обработки САНКТ-ПЕТЕРБУРГА") hotels = self.get_spb_hotels_to_process() total_hotels = len(hotels) if not hotels: logger.info("✅ Все отели Питера уже обработаны!") return logger.info(f"📊 Найдено отелей к обработке: {total_hotels}") processed = 0 total_chunks = 0 for idx, (hotel_id, hotel_name) in enumerate(hotels, 1): logger.info(f"\n🔄 Обрабатываем отель {idx}/{total_hotels}: {hotel_id}") try: chunks_saved, success = self.process_hotel(hotel_id) if success: processed += 1 total_chunks += chunks_saved except Exception as e: logger.error(f"❌ Ошибка обработки отеля {hotel_id}: {e}") continue logger.info("\n" + "="*80) logger.info("🎉 ПИТЕР ОБРАБОТАН!") logger.info("="*80) logger.info(f"✅ Обработано отелей: {processed}/{total_hotels}") logger.info(f"📦 Создано chunks: {total_chunks:,}") logger.info("="*80) def close(self): """Закрытие соединений""" if self.cur: self.cur.close() if self.conn: self.conn.close() if __name__ == "__main__": processor = EmbeddingProcessor() try: processor.run() except KeyboardInterrupt: logger.info("\n⚠️ Прервано пользователем") except Exception as e: logger.error(f"❌ Критическая ошибка: {e}") finally: processor.close() logger.info("👋 Завершение работы")