Files
hotels/scraper_missing.py

197 lines
6.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Парсер для ОСТАВШИХСЯ необработанных отелей
С автоматическим переподключением к БД
"""
import requests
import psycopg2
from psycopg2.extras import execute_batch, Json
import time
import logging
from datetime import datetime
from urllib.parse import unquote
from typing import Optional, Dict, List
import json
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'scraper_missing_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
DB_CONFIG = {
'host': "147.45.189.234",
'port': 5432,
'database': "default_db",
'user': "gen_user",
'password': unquote("2~~9_%5EkVsU%3F2%5CS")
}
API_BASE_URL = "https://tourism.fsa.gov.ru/api/v1"
RATE_LIMIT_DELAY = 0.1
BATCH_SIZE = 50
class MissingScraper:
def __init__(self, limit=None, offset=0):
self.limit = limit
self.offset = offset
self.conn = None
self.session = requests.Session()
self.processed = 0
self.errors = 0
def reconnect_db(self):
"""Переподключение к БД"""
if self.conn:
try:
self.conn.close()
except:
pass
self.conn = psycopg2.connect(**DB_CONFIG)
def get_missing_hotel_ids(self):
"""Получить ID необработанных отелей"""
self.reconnect_db()
cur = self.conn.cursor()
sql = """
SELECT m.id
FROM hotel_main m
LEFT JOIN hotel_raw_json r ON m.id = r.hotel_id
WHERE r.hotel_id IS NULL
ORDER BY m.id
"""
if self.limit:
sql += f" LIMIT {self.limit} OFFSET {self.offset}"
cur.execute(sql)
ids = [row[0] for row in cur.fetchall()]
cur.close()
return ids
def api_request(self, url: str) -> Optional[Dict]:
"""API запрос"""
time.sleep(RATE_LIMIT_DELAY)
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.json()
except:
return None
def get_hotel_details(self, hotel_id: str) -> Dict:
"""Получить детали отеля"""
return {
'hotel_id': hotel_id,
'main': self.api_request(f"{API_BASE_URL}/resorts/hotels/{hotel_id}/main"),
'additional_info': self.api_request(f"{API_BASE_URL}/resorts/common/{hotel_id}/additional-info"),
'sanatorium': self.api_request(f"{API_BASE_URL}/resorts/hotels/{hotel_id}/sanatoriumDrawer"),
'drawer': self.api_request(f"{API_BASE_URL}/resorts/hotels/{hotel_id}/drawer")
}
def save_batch(self, batch: List[Dict]):
"""Сохранить батч с переподключением"""
if not batch:
return
# Переподключаемся перед каждым сохранением
self.reconnect_db()
cur = self.conn.cursor()
try:
# Сохраняем в hotel_raw_json
records = [(item['hotel_id'], Json(item['main']), Json(item['additional_info']),
Json(item['sanatorium']), Json(item['drawer'])) for item in batch]
sql = """
INSERT INTO hotel_raw_json
(hotel_id, main_data, additional_info, sanatorium_data, drawer_data)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (hotel_id) DO UPDATE SET
main_data = EXCLUDED.main_data,
additional_info = EXCLUDED.additional_info,
sanatorium_data = EXCLUDED.sanatorium_data,
drawer_data = EXCLUDED.drawer_data
"""
execute_batch(cur, sql, records, page_size=BATCH_SIZE)
self.conn.commit()
logger.info(f"✓ Сохранено {len(batch)} отелей")
except Exception as e:
logger.error(f"Ошибка сохранения: {e}")
self.conn.rollback()
self.errors += len(batch)
finally:
cur.close()
def run(self):
"""Запуск"""
start = datetime.now()
logger.info(f"🚀 Запуск парсинга НЕОБРАБОТАННЫХ отелей")
# Получаем список необработанных
hotel_ids = self.get_missing_hotel_ids()
total = len(hotel_ids)
logger.info(f"📊 Необработанных отелей: {total}")
if total == 0:
logger.info("Все отели уже обработаны!")
return
batch = []
for idx, hotel_id in enumerate(hotel_ids, 1):
try:
details = self.get_hotel_details(hotel_id)
batch.append(details)
self.processed += 1
# Сохраняем батч
if len(batch) >= BATCH_SIZE:
self.save_batch(batch)
batch = []
# Прогресс
if idx % 100 == 0:
elapsed = (datetime.now() - start).total_seconds()
speed = self.processed / elapsed
eta_min = (total - idx) / speed / 60
logger.info(f"Progress: {idx}/{total} ({idx/total*100:.1f}%) | "
f"Speed: {speed:.1f}/sec | ETA: {eta_min:.0f} min")
except Exception as e:
logger.error(f"Ошибка обработки {hotel_id}: {e}")
self.errors += 1
# Остаток
if batch:
self.save_batch(batch)
elapsed = (datetime.now() - start).total_seconds()
logger.info(f"\n{'='*70}")
logger.info(f"Завершено: {self.processed}/{total}")
logger.info(f"Ошибок: {self.errors}")
logger.info(f"Время: {elapsed/60:.1f} минут")
logger.info(f"{'='*70}")
if __name__ == "__main__":
import sys
limit = int(sys.argv[1]) if len(sys.argv) > 1 else None
offset = int(sys.argv[2]) if len(sys.argv) > 2 else 0
scraper = MissingScraper(limit=limit, offset=offset)
scraper.run()