Files
hotels/website_crawler_db.py

444 lines
17 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Crawler для парсинга сайтов отелей с сохранением в PostgreSQL
- Сохраняет сырой HTML (для будущей переобработки)
- Сохраняет очищенный текст
- Извлекает структурированные данные
"""
import asyncio
import json
import logging
import re
import psycopg2
from psycopg2.extras import Json
from datetime import datetime
from typing import List, Dict, Set, Optional
from urllib.parse import urljoin, urlparse, unquote
from playwright.async_api import async_playwright, Page
from bs4 import BeautifulSoup, Comment
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'crawler_db_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Конфигурация БД
DB_CONFIG = {
'host': "147.45.189.234",
'port': 5432,
'database': "default_db",
'user': "gen_user",
'password': unquote("2~~9_%5EkVsU%3F2%5CS")
}
# Конфигурация краулинга
MAX_PAGES_PER_SITE = 20
PAGE_TIMEOUT = 45000
NAVIGATION_TIMEOUT = 40000
class TextCleaner:
"""Продвинутая очистка HTML с сохранением важных данных"""
# Теги для удаления (только мусор!)
REMOVE_TAGS = ['script', 'style', 'noscript']
# Классы/ID для удаления (только явная реклама)
REMOVE_PATTERNS = ['advertisement', 'ad-banner', 'google-ad']
# ВАЖНЫЕ классы/ID которые НЕ удаляем (контакты!)
KEEP_PATTERNS = ['contact', 'phone', 'email', 'address', 'footer', 'info', 'about']
@staticmethod
def clean_html(html: str) -> str:
"""Бережная очистка HTML - сохраняем контакты и важные данные"""
soup = BeautifulSoup(html, 'html.parser')
# 1. Удаляем комментарии
for comment in soup.find_all(string=lambda text: isinstance(text, Comment)):
comment.extract()
# 2. Удаляем только явный мусор (скрипты, стили)
for tag_name in TextCleaner.REMOVE_TAGS:
for tag in soup.find_all(tag_name):
tag.decompose()
# 3. Удаляем только явную рекламу (но проверяем, чтобы не было важных данных)
for pattern in TextCleaner.REMOVE_PATTERNS:
for tag in soup.find_all(class_=re.compile(pattern, re.I)):
# Проверяем, нет ли там важных данных
tag_text = tag.get_text().lower()
has_important = any(kw in tag_text for kw in ['телефон', 'email', 'адрес', '@', '+7', '8-'])
if not has_important:
tag.decompose()
# 4. Извлекаем текст (с переносами строк для читаемости)
text = soup.get_text(separator='\n', strip=True)
# 5. Убираем лишние пробелы, но сохраняем структуру
lines = [line.strip() for line in text.split('\n') if line.strip()]
text = '\n'.join(lines)
# 6. Убираем повторяющиеся переносы
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
@staticmethod
def extract_structured_data(html: str, text: str) -> Dict:
"""Извлечь структурированные данные"""
data = {
'phones': [],
'emails': [],
'inn': [],
'ogrn': [],
'addresses': []
}
# Телефоны
phones = re.findall(r'\+?[78][\s\-]?\(?(\d{3})\)?[\s\-]?(\d{3})[\s\-]?(\d{2})[\s\-]?(\d{2})', text)
data['phones'] = list(set([''.join(p) for p in phones]))[:10]
# Email
emails = re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text)
data['emails'] = list(set(emails))[:10]
# ИНН (10 или 12 цифр, контекст "ИНН")
inn_matches = re.findall(r'ИНН[:\s]*(\d{10}|\d{12})', text, re.IGNORECASE)
data['inn'] = list(set(inn_matches))[:3]
# ОГРН (13 или 15 цифр, контекст "ОГРН")
ogrn_matches = re.findall(r'ОГРН[:\s]*(\d{13}|\d{15})', text, re.IGNORECASE)
data['ogrn'] = list(set(ogrn_matches))[:3]
# Адреса (упрощенно - строки с "адрес:", "г.", "ул.")
address_patterns = [
r'[Аа]дрес[:\s]+([^\n]{20,150})',
r'г\.\s*[А-Я][а\-]+[,\s]+ул\.\s*[^\n]{10,100}'
]
for pattern in address_patterns:
addresses = re.findall(pattern, text)
data['addresses'].extend(addresses[:3])
data['addresses'] = list(set(data['addresses']))[:5]
return data
class WebsiteCrawlerDB:
"""Crawler с сохранением в PostgreSQL"""
def __init__(self, hotel_id: str, hotel_name: str, website: str):
self.hotel_id = hotel_id
self.hotel_name = hotel_name
self.website = self.normalize_url(website)
self.domain = self.extract_domain(self.website)
self.visited_urls: Set[str] = set()
self.pages_data: List[Dict] = []
self.cleaner = TextCleaner()
self.conn = None
self.start_time = None
@staticmethod
def normalize_url(url: str) -> str:
"""Нормализация URL"""
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
return url.rstrip('/')
@staticmethod
def extract_domain(url: str) -> str:
"""Извлечь домен"""
parsed = urlparse(url)
return parsed.netloc.lower()
def is_internal_link(self, url: str) -> bool:
"""Проверка внутренней ссылки"""
try:
parsed = urlparse(url)
link_domain = parsed.netloc.lower()
return (link_domain == self.domain or
link_domain.endswith('.' + self.domain) or
self.domain.endswith('.' + link_domain))
except:
return False
def connect_db(self):
"""Подключение к БД"""
self.conn = psycopg2.connect(**DB_CONFIG)
logger.info(" ✓ Подключено к PostgreSQL")
def init_meta(self):
"""Инициализация метаинформации"""
cur = self.conn.cursor()
cur.execute("""
INSERT INTO hotel_website_meta
(hotel_id, domain, main_url, crawl_status, crawl_started_at)
VALUES (%s, %s, %s, 'in_progress', %s)
ON CONFLICT (hotel_id) DO UPDATE SET
crawl_status = 'in_progress',
crawl_started_at = EXCLUDED.crawl_started_at,
updated_at = CURRENT_TIMESTAMP
""", (self.hotel_id, self.domain, self.website, self.start_time))
self.conn.commit()
cur.close()
def save_page(self, url: str, title: str, html: str, status_code: int,
response_time: int, depth: int, cleaned_text: str, structured_data: Dict):
"""Сохранить страницу в БД"""
cur = self.conn.cursor()
try:
# Сохраняем сырой HTML
cur.execute("""
INSERT INTO hotel_website_raw
(hotel_id, url, page_title, html, status_code, response_time_ms, depth)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (hotel_id, url) DO UPDATE SET
html = EXCLUDED.html,
page_title = EXCLUDED.page_title,
status_code = EXCLUDED.status_code,
response_time_ms = EXCLUDED.response_time_ms,
crawled_at = CURRENT_TIMESTAMP
RETURNING id
""", (self.hotel_id, url, title, html, status_code, response_time, depth))
raw_page_id = cur.fetchone()[0]
# Сохраняем обработанный текст
cur.execute("""
INSERT INTO hotel_website_processed
(raw_page_id, hotel_id, url, cleaned_text, extracted_data,
has_forms, has_booking, text_length)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
""", (raw_page_id, self.hotel_id, url, cleaned_text, Json(structured_data),
structured_data.get('has_forms', False),
structured_data.get('has_booking', False),
len(cleaned_text)))
self.conn.commit()
except Exception as e:
logger.error(f"Ошибка сохранения страницы {url}: {e}")
self.conn.rollback()
finally:
cur.close()
def update_meta(self, status: str, error_msg: Optional[str] = None):
"""Обновить метаинформацию"""
cur = self.conn.cursor()
total_size = sum(len(p.get('html', '')) for p in self.pages_data)
cur.execute("""
UPDATE hotel_website_meta SET
pages_crawled = %s,
total_size_bytes = %s,
crawl_status = %s,
crawl_finished_at = %s,
error_message = %s,
updated_at = CURRENT_TIMESTAMP
WHERE hotel_id = %s
""", (len(self.pages_data), total_size, status, datetime.now(), error_msg, self.hotel_id))
self.conn.commit()
cur.close()
async def extract_page_data(self, page: Page, url: str, depth: int) -> Optional[Dict]:
"""Извлечь данные со страницы"""
start_time = datetime.now()
try:
title = await page.title()
html = await page.content()
# Очищаем текст
cleaned_text = self.cleaner.clean_html(html)
# Извлекаем структурированные данные
structured_data = self.cleaner.extract_structured_data(html, cleaned_text)
# Извлекаем ссылки
links = await page.evaluate("""
() => Array.from(document.querySelectorAll('a[href]'))
.map(a => a.href)
.filter(href => href && !href.startsWith('mailto:') && !href.startsWith('tel:'))
""")
# Проверки
structured_data['has_forms'] = await page.evaluate("() => document.querySelectorAll('form').length > 0")
structured_data['has_booking'] = 'бронирован' in cleaned_text.lower() or 'booking' in cleaned_text.lower()
response_time = int((datetime.now() - start_time).total_seconds() * 1000)
page_data = {
'url': url,
'title': title,
'html': html,
'cleaned_text': cleaned_text,
'structured_data': structured_data,
'links': list(set(links)),
'status_code': 200,
'response_time': response_time,
'depth': depth,
'text_length': len(cleaned_text)
}
# Сохраняем в БД
self.save_page(
url, title, html, 200, response_time, depth,
cleaned_text, structured_data
)
return page_data
except Exception as e:
logger.error(f"Ошибка извлечения данных с {url}: {e}")
return None
async def crawl_page(self, page: Page, url: str, depth: int = 0):
"""Парсинг одной страницы"""
if url in self.visited_urls or len(self.visited_urls) >= MAX_PAGES_PER_SITE:
return
# Пропускаем PDF и файлы
if url.lower().endswith(('.pdf', '.doc', '.docx', '.zip', '.jpg', '.png')):
return
try:
logger.info(f" Парсинг (depth={depth}): {url[:80]}...")
# Загружаем страницу
try:
await page.goto(url, wait_until='domcontentloaded', timeout=NAVIGATION_TIMEOUT)
await page.wait_for_timeout(2000)
except Exception as e:
logger.warning(f" Пробуем load вместо domcontentloaded")
await page.goto(url, wait_until='load', timeout=NAVIGATION_TIMEOUT)
await page.wait_for_timeout(1000)
self.visited_urls.add(url)
# Извлекаем и сохраняем данные
page_data = await self.extract_page_data(page, url, depth)
if page_data:
self.pages_data.append(page_data)
logger.info(f" ✓ Сохранено {page_data['text_length']} символов в БД")
# Парсим внутренние ссылки (только для depth=0)
if depth == 0 and page_data.get('links'):
internal_links = [
link for link in page_data['links']
if self.is_internal_link(link) and link not in self.visited_urls
]
logger.info(f" Найдено {len(internal_links)} внутренних ссылок")
# Парсим depth=1
for link in internal_links[:MAX_PAGES_PER_SITE - 1]:
if len(self.visited_urls) >= MAX_PAGES_PER_SITE:
break
await self.crawl_page(page, link, depth=1)
except Exception as e:
logger.error(f" ✗ Ошибка парсинга {url}: {e}")
async def crawl(self) -> bool:
"""Запуск парсинга сайта"""
self.start_time = datetime.now()
logger.info(f"\n{'='*70}")
logger.info(f"🏨 {self.hotel_name[:60]}")
logger.info(f"🌐 {self.website}")
logger.info(f"{'='*70}")
self.connect_db()
self.init_meta()
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# Парсим главную
await self.crawl_page(page, self.website, depth=0)
await browser.close()
logger.info(f"✓ Спарсено {len(self.pages_data)} страниц")
# Обновляем метаинформацию
self.update_meta('completed')
return len(self.pages_data) > 0
except Exception as e:
logger.error(f"✗ Критическая ошибка: {e}")
self.update_meta('failed', str(e))
return False
finally:
if self.conn:
self.conn.close()
async def main():
"""Главная функция"""
import sys
hotels_file = sys.argv[1] if len(sys.argv) > 1 else 'test_single_hotel.json'
with open(hotels_file, 'r', encoding='utf-8') as f:
hotels = json.load(f)
logger.info(f"\n{'='*70}")
logger.info(f"🚀 ЗАПУСК КРАУЛИНГА С СОХРАНЕНИЕМ В POSTGRESQL")
logger.info(f"📊 Отелей: {len(hotels)}")
logger.info(f"💾 Таблицы: hotel_website_raw, hotel_website_meta")
logger.info(f"{'='*70}\n")
success_count = 0
error_count = 0
for idx, hotel in enumerate(hotels, 1):
logger.info(f"\n[{idx}/{len(hotels)}] ====================================")
try:
crawler = WebsiteCrawlerDB(
hotel_id=hotel['id'],
hotel_name=hotel['name'],
website=hotel['website']
)
if await crawler.crawl():
success_count += 1
else:
error_count += 1
# Задержка между отелями
await asyncio.sleep(3)
except Exception as e:
logger.error(f"✗ Ошибка для отеля {hotel['name']}: {e}")
error_count += 1
logger.info(f"\n{'='*70}")
logger.info(f"📊 ИТОГИ:")
logger.info(f" ✅ Успешно: {success_count}/{len(hotels)}")
logger.info(f" ✗ Ошибки: {error_count}/{len(hotels)}")
logger.info(f"{'='*70}\n")
if __name__ == "__main__":
asyncio.run(main())