Files
hotels/process_spb_robust.py

189 lines
6.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Устойчивая версия обработки СПб с retry логикой
"""
from urllib.parse import unquote
import psycopg2
from bs4 import BeautifulSoup
import re
import logging
from datetime import datetime
import time
# Конфигурация БД
DB_CONFIG = {
'host': "147.45.189.234",
'port': 5432,
'database': "default_db",
'user': "gen_user",
'password': unquote("2~~9_%5EkVsU%3F2%5CS"),
'connect_timeout': 10,
'keepalives_idle': 600,
'keepalives_interval': 30,
'keepalives_count': 3
}
# Логирование
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'spb_robust_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def clean_html(html: str) -> str:
"""Очистка HTML"""
soup = BeautifulSoup(html, 'html.parser')
for tag in soup(['script', 'style', 'meta', 'link', 'noscript']):
tag.decompose()
text = soup.get_text(separator=' ', strip=True)
text = re.sub(r'\s+', ' ', text)
return text.strip()
def get_connection():
"""Получить соединение с retry"""
for attempt in range(3):
try:
conn = psycopg2.connect(**DB_CONFIG)
return conn
except Exception as e:
logger.warning(f"Попытка {attempt + 1} подключения: {e}")
if attempt < 2:
time.sleep(5)
else:
raise
def process_spb_robust():
"""Обрабатываем СПб с устойчивостью к сбоям"""
conn = None
total_processed = 0
batch_size = 50 # Уменьшили размер пачки
try:
while True:
try:
# Получаем новое соединение для каждой итерации
if conn:
conn.close()
conn = get_connection()
cur = conn.cursor()
# Получаем следующую порцию
cur.execute('''
SELECT
r.id as raw_id,
r.hotel_id,
r.url,
r.html
FROM hotel_website_raw r
JOIN hotel_main h ON h.id = r.hotel_id
LEFT JOIN hotel_website_processed p ON p.hotel_id = r.hotel_id AND p.url = r.url
WHERE h.region_name = 'г. Санкт-Петербург'
AND p.hotel_id IS NULL
ORDER BY r.id
LIMIT %s
''', (batch_size,))
batch = cur.fetchall()
if not batch:
logger.info("🎉 Все данные обработаны!")
break
logger.info(f"📦 Обрабатываю пачку: {len(batch)} страниц")
# Обрабатываем пачку
for raw_id, hotel_id, url, html in batch:
try:
cleaned_text = clean_html(html)
text_length = len(cleaned_text)
# Вставляем в processed
cur.execute("""
INSERT INTO hotel_website_processed
(raw_page_id, hotel_id, url, cleaned_text, text_length, processed_at)
VALUES (%s, %s, %s, %s, %s, NOW())
ON CONFLICT (hotel_id, url) DO UPDATE SET
cleaned_text = EXCLUDED.cleaned_text,
text_length = EXCLUDED.text_length,
processed_at = EXCLUDED.processed_at
""", (raw_id, hotel_id, url, cleaned_text, text_length))
total_processed += 1
except Exception as e:
logger.error(f"❌ Ошибка обработки {hotel_id} {url}: {e}")
continue
# Коммитим пачку
conn.commit()
cur.close()
# Проверяем прогресс
cur = conn.cursor()
cur.execute('''
SELECT COUNT(*)
FROM hotel_website_raw r
JOIN hotel_main h ON h.id = r.hotel_id
LEFT JOIN hotel_website_processed p ON p.hotel_id = r.hotel_id AND p.url = r.url
WHERE h.region_name = 'г. Санкт-Петербург'
AND p.hotel_id IS NULL
''')
remaining = cur.fetchone()[0]
total_pages = total_processed + remaining
percent = total_processed / total_pages * 100 if total_pages > 0 else 0
logger.info(f"✅ Обработано: {total_processed}/{total_pages} ({percent:.1f}%)")
cur.close()
# Небольшая пауза между пачками
time.sleep(1)
except Exception as e:
logger.error(f"❌ Ошибка в цикле: {e}")
if conn:
try:
conn.close()
except:
pass
conn = None
time.sleep(10) # Пауза перед повтором
continue
# Финальная статистика
conn = get_connection()
cur = conn.cursor()
cur.execute('''
SELECT COUNT(DISTINCT p.hotel_id)
FROM hotel_website_processed p
JOIN hotel_main h ON h.id = p.hotel_id
WHERE h.region_name = 'г. Санкт-Петербург'
''')
processed_hotels = cur.fetchone()[0]
logger.info(f"🎉 ЗАВЕРШЕНО! Отелей СПб в processed: {processed_hotels}")
cur.close()
finally:
if conn:
conn.close()
if __name__ == "__main__":
process_spb_robust()