Files
hotels/universal_parser_api.py
Фёдор 684fada337 🚀 Full project sync: Hotels RAG & Audit System
 Major Features:
- Complete RAG system for hotel website analysis
- Hybrid audit with BGE-M3 embeddings + Natasha NER
- Universal horizontal Excel reports with dashboards
- Multi-region processing (SPb, Orel, Chukotka, Kamchatka)

📊 Completed Regions:
- Орловская область: 100% (36/36)
- Чукотский АО: 100% (4/4)
- г. Санкт-Петербург: 93% (893/960)
- Камчатский край: 87% (89/102)

🔧 Infrastructure:
- PostgreSQL with pgvector extension
- BGE-M3 embeddings API
- Browserless for web scraping
- N8N workflows for automation
- S3/Nextcloud file storage

📝 Documentation:
- Complete DB schemas
- API documentation
- Setup guides
- Status reports
2025-10-27 22:49:42 +03:00

362 lines
13 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
🕷️ УНИВЕРСАЛЬНЫЙ ПАРСЕР API
Обходит защиты сайтов (Cloudflare, WAF) и парсит любой контент
Endpoints:
- POST /parse - парсинг страницы
- GET /health - статус API
"""
from fastapi import FastAPI, HTTPException, Security, Depends
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel, HttpUrl
from typing import Optional, List
import asyncio
from playwright.async_api import async_playwright
from playwright_stealth import Stealth
import logging
from datetime import datetime
import secrets
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('parser_api.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# FastAPI приложение
app = FastAPI(
title="Universal Parser API",
description="Обход защит и парсинг любых сайтов через Playwright Stealth",
version="1.0.0"
)
# API ключ (сгенерирован случайно)
# ⚠️ В продакшене хранить в .env!
API_KEY = "parser_2025_secret_key_a8f3d9c1b4e7"
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=True)
async def verify_api_key(api_key: str = Security(api_key_header)):
"""Проверка API ключа"""
if api_key != API_KEY:
logger.warning(f"⚠️ Неверный API ключ: {api_key[:10]}...")
raise HTTPException(
status_code=403,
detail="Неверный API ключ"
)
return api_key
# Модели данных
class ParseRequest(BaseModel):
url: HttpUrl
wait_seconds: Optional[int] = 3
extract_links: Optional[bool] = False
screenshot: Optional[bool] = False
javascript_enabled: Optional[bool] = True
user_agent: Optional[str] = None
class Config:
json_schema_extra = {
"example": {
"url": "https://mos-sud.ru/312/cases/civil/details/...",
"wait_seconds": 5,
"extract_links": True,
"screenshot": False
}
}
class ParseResponse(BaseModel):
success: bool
url: str
status_code: int
title: str
html: str
text: str
text_length: int
links: Optional[List[str]] = []
screenshot_base64: Optional[str] = None
parsing_time: float
timestamp: str
error: Optional[str] = None
class HealthResponse(BaseModel):
status: str
version: str
timestamp: str
# Парсер
class UniversalParser:
"""Универсальный парсер с обходом защит"""
@staticmethod
async def parse(
url: str,
wait_seconds: int = 3,
extract_links: bool = False,
screenshot: bool = False,
javascript_enabled: bool = True,
user_agent: Optional[str] = None
) -> ParseResponse:
"""
Парсинг страницы с обходом защит
"""
start_time = asyncio.get_event_loop().time()
# Дефолтный User-Agent
if not user_agent:
user_agent = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
try:
async with async_playwright() as p:
# Запускаем браузер
browser = await p.chromium.launch(
headless=True,
args=[
'--disable-blink-features=AutomationControlled',
'--disable-dev-shm-usage',
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-web-security',
'--disable-features=IsolateOrigins,site-per-process'
]
)
# Контекст с продвинутыми настройками
context = await browser.new_context(
user_agent=user_agent,
viewport={'width': 1920, 'height': 1080},
locale='ru-RU',
timezone_id='Europe/Moscow',
color_scheme='light',
device_scale_factor=1,
has_touch=False,
is_mobile=False,
java_script_enabled=javascript_enabled,
extra_http_headers={
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'ru-RU,ru;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Cache-Control': 'max-age=0',
'DNT': '1'
}
)
page = await context.new_page()
# 🔥 ПРИМЕНЯЕМ STEALTH (обход детекции)
stealth = Stealth()
await stealth.apply_stealth_async(page)
# Дополнительные скрипты для маскировки
await page.add_init_script("""
// Скрываем webdriver
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
});
// Chrome runtime
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {}
};
// Plugins
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5]
});
// Languages
Object.defineProperty(navigator, 'languages', {
get: () => ['ru-RU', 'ru', 'en-US', 'en']
});
// Permissions
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
""")
logger.info(f"🌐 Загружаем: {url}")
# ФИКС: Сначала загружаем главную (получаем cookies и referer)
from urllib.parse import urlparse
parsed = urlparse(str(url))
base_url = f"{parsed.scheme}://{parsed.netloc}/"
# Шаг 1: Главная страница
logger.info(f"🏠 Загружаем главную: {base_url}")
await page.goto(base_url, wait_until='domcontentloaded', timeout=30000)
await page.wait_for_timeout(1000)
# Шаг 2: Целевая страница (теперь есть referer!)
logger.info(f"🎯 Переходим на целевую")
response = await page.goto(
url,
wait_until='domcontentloaded',
timeout=45000
)
status_code = response.status
logger.info(f"📊 Статус: {status_code}")
# Ждём дополнительную загрузку
await page.wait_for_timeout(wait_seconds * 1000)
# Получаем данные
title = await page.title()
html = await page.content()
text = await page.inner_text('body')
# Извлекаем ссылки
links = []
if extract_links:
links_elements = await page.query_selector_all('a[href]')
links = [await link.get_attribute('href') for link in links_elements]
links = [link for link in links if link] # Убираем None
# Скриншот
screenshot_base64 = None
if screenshot:
screenshot_bytes = await page.screenshot(full_page=False)
import base64
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
await browser.close()
parsing_time = asyncio.get_event_loop().time() - start_time
logger.info(f"✅ Успешно спарсено: {len(text)} символов за {parsing_time:.2f}с")
return ParseResponse(
success=True,
url=str(url),
status_code=status_code,
title=title,
html=html,
text=text,
text_length=len(text),
links=links if extract_links else [],
screenshot_base64=screenshot_base64,
parsing_time=round(parsing_time, 2),
timestamp=datetime.now().isoformat()
)
except Exception as e:
logger.error(f"❌ Ошибка парсинга {url}: {e}")
parsing_time = asyncio.get_event_loop().time() - start_time
return ParseResponse(
success=False,
url=str(url),
status_code=0,
title="",
html="",
text="",
text_length=0,
parsing_time=round(parsing_time, 2),
timestamp=datetime.now().isoformat(),
error=str(e)
)
# API Endpoints
@app.get("/", tags=["Info"])
async def root():
"""Информация об API"""
return {
"name": "Universal Parser API",
"version": "1.0.0",
"description": "Обход защит и парсинг любых сайтов",
"endpoints": {
"POST /parse": "Парсинг страницы",
"GET /health": "Статус API"
},
"documentation": "/docs",
"author": "Your Team"
}
@app.get("/health", response_model=HealthResponse, tags=["Health"])
async def health():
"""Проверка статуса API"""
return HealthResponse(
status="healthy",
version="1.0.0",
timestamp=datetime.now().isoformat()
)
@app.post("/parse", response_model=ParseResponse, tags=["Parser"])
async def parse_page(
request: ParseRequest,
api_key: str = Depends(verify_api_key)
):
"""
Парсинг страницы с обходом защит
Требуется API ключ в заголовке: X-API-Key
Параметры:
- url: URL страницы для парсинга
- wait_seconds: Время ожидания после загрузки (по умолчанию 3)
- extract_links: Извлечь все ссылки (по умолчанию False)
- screenshot: Сделать скриншот (по умолчанию False)
- javascript_enabled: Включить JavaScript (по умолчанию True)
- user_agent: Кастомный User-Agent (опционально)
"""
logger.info(f"📥 Запрос на парсинг: {request.url}")
result = await UniversalParser.parse(
url=str(request.url),
wait_seconds=request.wait_seconds,
extract_links=request.extract_links,
screenshot=request.screenshot,
javascript_enabled=request.javascript_enabled,
user_agent=request.user_agent
)
return result
if __name__ == "__main__":
import uvicorn
logger.info("🚀 Запуск Universal Parser API")
logger.info(f"🔑 API Key: {API_KEY}")
logger.info("📝 Документация: http://localhost:8003/docs")
uvicorn.run(
app,
host="0.0.0.0",
port=8003,
log_level="info"
)