Files
hotels/create_spb_processed.py

387 lines
15 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
ЭТАП 1: Создание hotel_website_processed для Питера через Browserless Scrape
HTML Browserless cleaned_text hotel_website_processed
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from urllib.parse import unquote
import requests
import json
import time
import logging
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import queue
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'spb_processed_creation_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Конфигурация БД
DB_CONFIG = {
'host': '147.45.189.234',
'port': 5432,
'database': 'default_db',
'user': 'gen_user',
'password': unquote('2~~9_%5EkVsU%3F2%5CS')
}
# Browserless API
BROWSERLESS_URL = "http://147.45.146.17:3000/function?token=9ahhnpjkchxtcho9"
# Многопоточность
MAX_WORKERS = 5 # Количество потоков для Browserless
class SpbProcessor:
def __init__(self):
self.conn = None
self.cur = None
self.connect_db()
def connect_db(self):
"""Подключение к БД"""
try:
self.conn = psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor)
self.conn.autocommit = True
self.cur = self.conn.cursor()
logger.info("✅ Подключение к БД установлено")
except Exception as e:
logger.error(f"❌ Ошибка подключения к БД: {e}")
raise
def create_processed_table(self):
"""Создание таблицы hotel_website_processed если не существует"""
try:
self.cur.execute("""
CREATE TABLE IF NOT EXISTS hotel_website_processed (
id SERIAL PRIMARY KEY,
hotel_id UUID NOT NULL,
url TEXT,
cleaned_text TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (hotel_id) REFERENCES hotel_main(id)
);
""")
# Создаем индекс
self.cur.execute("""
CREATE INDEX IF NOT EXISTS idx_hotel_website_processed_hotel_id
ON hotel_website_processed(hotel_id);
""")
logger.info("✅ Таблица hotel_website_processed готова")
except Exception as e:
logger.error(f"❌ Ошибка создания таблицы: {e}")
raise
def clean_html_with_browserless(self, html: str, max_retries: int = 3) -> str:
"""Очистка HTML через Browserless Scrape API с retry логикой"""
# JavaScript функция для извлечения текста
scrape_function = """
export default async function ({ page, context }) {
const html = context.html;
// Устанавливаем HTML в страницу
await page.setContent(html);
// Извлекаем весь текст
const text = await page.evaluate(() => {
// Удаляем script и style элементы
const scripts = document.querySelectorAll('script, style');
scripts.forEach(el => el.remove());
// Получаем весь текст
return document.body.innerText || document.body.textContent || '';
});
return {
text: text,
length: text.length
};
}
"""
payload = {
"code": scrape_function,
"context": {"html": html}
}
for attempt in range(max_retries):
try:
response = requests.post(BROWSERLESS_URL, json=payload, timeout=30)
response.raise_for_status()
result = response.json()
if result and 'text' in result:
return result['text']
return ""
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Экспоненциальная задержка
logger.warning(f"⚠️ Попытка {attempt + 1}/{max_retries} не удалась, ждём {wait_time}с: {e}")
time.sleep(wait_time)
else:
logger.error(f"Все попытки исчерпаны для Browserless API: {e}")
return ""
return ""
def process_page(self, page_data: dict) -> dict:
"""Обработка одной страницы (для многопоточности)"""
page_id = page_data['id']
url = page_data['url']
html = page_data['html']
hotel_id = page_data['hotel_id']
try:
# Очищаем HTML через Browserless
cleaned_text = self.clean_html_with_browserless(html)
if cleaned_text and len(cleaned_text.strip()) > 50:
return {
'success': True,
'page_id': page_id,
'hotel_id': hotel_id,
'url': url,
'cleaned_text': cleaned_text,
'length': len(cleaned_text)
}
else:
return {
'success': False,
'page_id': page_id,
'hotel_id': hotel_id,
'error': 'Слишком короткий текст'
}
except Exception as e:
return {
'success': False,
'page_id': page_id,
'hotel_id': hotel_id,
'error': str(e)
}
def process_hotel_pages(self, hotel_id: str) -> int:
"""Обработка всех страниц одного отеля (многопоточно)"""
try:
# Получаем HTML страницы отеля
self.cur.execute("""
SELECT id, url, html
FROM hotel_website_raw
WHERE hotel_id = %s
AND html IS NOT NULL
ORDER BY id
""", (hotel_id,))
pages = self.cur.fetchall()
if not pages:
logger.warning(f"⚠️ Нет HTML для отеля {hotel_id}")
return 0
logger.info(f"📄 Найдено {len(pages)} страниц для отеля")
# Подготавливаем данные для многопоточности
page_data_list = []
for page in pages:
page_data_list.append({
'id': page['id'],
'url': page['url'],
'html': page['html'],
'hotel_id': hotel_id
})
processed_count = 0
# Многопоточная обработка страниц
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Отправляем задачи
future_to_page = {
executor.submit(self.process_page, page_data): page_data
for page_data in page_data_list
}
# Обрабатываем результаты
for future in as_completed(future_to_page):
result = future.result()
if result['success']:
# Сохраняем в hotel_website_processed
self.cur.execute("""
INSERT INTO hotel_website_processed (hotel_id, url, cleaned_text)
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
""", (result['hotel_id'], result['url'], result['cleaned_text']))
processed_count += 1
logger.info(f" ✅ Страница {result['page_id']}: {result['length']} символов")
else:
logger.warning(f" ⚠️ Страница {result['page_id']}: {result['error']}")
logger.info(f"✅ Отель обработан: {processed_count}/{len(pages)} страниц")
return processed_count
except Exception as e:
logger.error(f"❌ Ошибка обработки отеля {hotel_id}: {e}")
return 0
def get_spb_hotels(self):
"""Получение списка отелей Питера для обработки"""
try:
self.cur.execute("""
SELECT DISTINCT h.id, h.full_name
FROM hotel_main h
INNER JOIN hotel_website_raw hwr ON h.id = hwr.hotel_id
WHERE h.region_name = 'г. Санкт-Петербург'
AND hwr.html IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM hotel_website_processed hwp
WHERE hwp.hotel_id = h.id
)
ORDER BY h.id
""")
hotels = self.cur.fetchall()
logger.info(f"📊 Найдено {len(hotels)} отелей для обработки")
return hotels
except Exception as e:
logger.error(f"❌ Ошибка получения списка отелей: {e}")
return []
def get_stats(self):
"""Получение статистики"""
try:
# Всего отелей с HTML
self.cur.execute("""
SELECT COUNT(DISTINCT h.id)
FROM hotel_main h
INNER JOIN hotel_website_raw hwr ON h.id = hwr.hotel_id
WHERE h.region_name = 'г. Санкт-Петербург'
AND hwr.html IS NOT NULL
""")
total_hotels = self.cur.fetchone()[0]
# Обработанных отелей
self.cur.execute("""
SELECT COUNT(DISTINCT hotel_id)
FROM hotel_website_processed hwp
WHERE EXISTS (
SELECT 1 FROM hotel_main h
WHERE h.id = hwp.hotel_id
AND h.region_name = 'г. Санкт-Петербург'
)
""")
processed_hotels = self.cur.fetchone()[0]
# Всего страниц обработано
self.cur.execute("""
SELECT COUNT(*)
FROM hotel_website_processed hwp
WHERE EXISTS (
SELECT 1 FROM hotel_main h
WHERE h.id = hwp.hotel_id
AND h.region_name = 'г. Санкт-Петербург'
)
""")
processed_pages = self.cur.fetchone()[0]
return {
'total_hotels': total_hotels,
'processed_hotels': processed_hotels,
'processed_pages': processed_pages
}
except Exception as e:
logger.error(f"❌ Ошибка получения статистики: {e}")
return {}
def close(self):
"""Закрытие соединения"""
if self.cur:
self.cur.close()
if self.conn:
self.conn.close()
def main():
"""Основная функция"""
logger.info("🚀 ЭТАП 1: Создание hotel_website_processed для Питера")
logger.info("🌐 Используем Browserless Scrape для лучшего качества")
logger.info(f"⚡ Многопоточность: {MAX_WORKERS} потоков")
processor = SpbProcessor()
try:
# Создаем таблицу
processor.create_processed_table()
# Получаем статистику
stats = processor.get_stats()
logger.info(f"📊 Статистика:")
logger.info(f" Всего отелей: {stats.get('total_hotels', 0)}")
logger.info(f" Обработано отелей: {stats.get('processed_hotels', 0)}")
logger.info(f" Обработано страниц: {stats.get('processed_pages', 0)}")
# Получаем список отелей для обработки
hotels = processor.get_spb_hotels()
if not hotels:
logger.info("Все отели уже обработаны!")
return
logger.info(f"🔄 Начинаем обработку {len(hotels)} отелей...")
total_processed = 0
start_time = time.time()
for i, hotel in enumerate(hotels, 1):
hotel_id = hotel['id']
hotel_name = hotel['full_name']
logger.info(f"🏨 [{i}/{len(hotels)}] {hotel_name[:50]}...")
processed = processor.process_hotel_pages(hotel_id)
total_processed += processed
# Обновляем статистику каждые 10 отелей
if i % 10 == 0:
stats = processor.get_stats()
elapsed = time.time() - start_time
rate = i / elapsed * 3600 # отелей в час
logger.info(f"📈 Прогресс: {i}/{len(hotels)} отелей")
logger.info(f"⏱️ Скорость: {rate:.1f} отелей/час")
logger.info(f"📊 Обработано страниц: {stats.get('processed_pages', 0)}")
# Финальная статистика
elapsed = time.time() - start_time
stats = processor.get_stats()
logger.info("=" * 60)
logger.info("✅ ЭТАП 1 ЗАВЕРШЁН!")
logger.info(f" Время: {elapsed/3600:.1f} часов")
logger.info(f" Обработано отелей: {stats.get('processed_hotels', 0)}")
logger.info(f" Обработано страниц: {stats.get('processed_pages', 0)}")
logger.info("=" * 60)
except Exception as e:
logger.error(f"❌ Критическая ошибка: {e}")
finally:
processor.close()
if __name__ == "__main__":
main()