Files
hotels/process_spb_region.py
Фёдор 684fada337 🚀 Full project sync: Hotels RAG & Audit System
 Major Features:
- Complete RAG system for hotel website analysis
- Hybrid audit with BGE-M3 embeddings + Natasha NER
- Universal horizontal Excel reports with dashboards
- Multi-region processing (SPb, Orel, Chukotka, Kamchatka)

📊 Completed Regions:
- Орловская область: 100% (36/36)
- Чукотский АО: 100% (4/4)
- г. Санкт-Петербург: 93% (893/960)
- Камчатский край: 87% (89/102)

🔧 Infrastructure:
- PostgreSQL with pgvector extension
- BGE-M3 embeddings API
- Browserless for web scraping
- N8N workflows for automation
- S3/Nextcloud file storage

📝 Documentation:
- Complete DB schemas
- API documentation
- Setup guides
- Status reports
2025-10-27 22:49:42 +03:00

349 lines
13 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Скрипт для обработки Санкт-Петербурга:
1. Чанкинизация всех краулнутых отелей
2. Аудит всех чанкинизированных отелей
"""
import psycopg2
from psycopg2.extras import RealDictCursor
from urllib.parse import unquote
import requests
import json
import logging
from datetime import datetime
import time
import sys
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'spb_processing_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler(sys.stdout)
]
)
DB_CONFIG = {
'host': '147.45.189.234',
'port': 5432,
'database': 'default_db',
'user': 'gen_user',
'password': unquote('2~~9_%5EkVsU%3F2%5CS')
}
BGE_API_URL = 'http://147.45.146.17:8002/embed'
BGE_API_KEY = '22564b177aa73b6ac0b8642d7773350ff4c01d4983f028beff15ea247f09fa89'
N8N_WEBHOOK_URL = 'https://n8n.clientright.pro/webhook/6be4a7b9-a016-4252-841f-0ebca367914f'
REGION = 'г. Санкт-Петербург'
class SPBProcessor:
def __init__(self):
self.conn = psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor)
self.cur = self.conn.cursor()
def get_hotels_to_chunk(self):
"""Получить отели для чанкинизации"""
self.cur.execute("""
SELECT DISTINCT h.id, h.full_name
FROM hotel_main h
JOIN hotel_website_processed hwp ON h.id = hwp.hotel_id
LEFT JOIN hotel_website_chunks hc ON h.id::text = hc.metadata->>'hotel_id'
WHERE h.region_name = %s
AND hwp.cleaned_text IS NOT NULL
AND hc.id IS NULL
ORDER BY h.full_name
""", (REGION,))
return self.cur.fetchall()
def get_hotels_to_audit(self):
"""Получить отели для аудита"""
self.cur.execute("""
SELECT DISTINCT h.id, h.full_name
FROM hotel_main h
JOIN hotel_website_chunks hc ON h.id::text = hc.metadata->>'hotel_id'
LEFT JOIN hotel_audit_results har ON h.id = har.hotel_id AND har.audit_version = 'v1.0_with_rkn'
WHERE h.region_name = %s
AND har.hotel_id IS NULL
ORDER BY h.full_name
""", (REGION,))
return self.cur.fetchall()
def chunk_text(self, text, chunk_size=1000, overlap=200):
"""Разбить текст на chunks"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
if chunk.strip():
chunks.append(chunk)
start = end - overlap
return chunks
def get_embeddings_batch(self, texts, max_retries=3):
"""Получить эмбеддинги для батча текстов"""
for attempt in range(max_retries):
try:
response = requests.post(
BGE_API_URL,
headers={
'X-API-Key': BGE_API_KEY,
'Content-Type': 'application/json'
},
json={'text': texts},
timeout=30
)
if response.status_code == 200:
data = response.json()
return data.get('embeddings', [])
else:
logging.error(f"API вернул статус {response.status_code}: {response.text}")
except Exception as e:
logging.error(f"Ошибка получения эмбеддингов (попытка {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
return None
def process_hotel_chunks(self, hotel_id, hotel_name):
"""Обработать chunks для отеля"""
try:
# Получить ОЧИЩЕННЫЙ текст из hotel_website_processed
self.cur.execute("""
SELECT id, cleaned_text FROM hotel_website_processed
WHERE hotel_id = %s AND cleaned_text IS NOT NULL
""", (hotel_id,))
pages = self.cur.fetchall()
if not pages:
logging.warning(f" ⚠️ Нет обработанных данных для {hotel_name}")
return False
# Удалить старые chunks
self.cur.execute(
"DELETE FROM hotel_website_chunks WHERE metadata->>'hotel_id' = %s",
(str(hotel_id),)
)
total_chunks = 0
for page in pages:
cleaned_text = page['cleaned_text']
if not cleaned_text or len(cleaned_text) < 100:
continue
# Разбить на chunks
chunks = self.chunk_text(cleaned_text)
if not chunks:
continue
# Обработать батчами по 8
BATCH_SIZE = 8
for i in range(0, len(chunks), BATCH_SIZE):
batch = chunks[i:i + BATCH_SIZE]
# Получить эмбеддинги
embeddings = self.get_embeddings_batch(batch)
if not embeddings or len(embeddings) != len(batch):
logging.error(f" ❌ Ошибка получения эмбеддингов для батча")
continue
# Сохранить chunks
for j, (chunk, embedding) in enumerate(zip(batch, embeddings)):
import uuid
chunk_id = str(uuid.uuid4())
metadata = {
'hotel_id': str(hotel_id),
'chunk_index': i + j,
'page_id': page['id'],
'created_at': time.time()
}
self.cur.execute("""
INSERT INTO hotel_website_chunks (id, text, metadata, embedding)
VALUES (%s, %s, %s, %s::vector)
""", (chunk_id, chunk, json.dumps(metadata), json.dumps(embedding)))
total_chunks += len(batch)
self.conn.commit()
logging.info(f" ✅ Создано {total_chunks} chunks")
return True
except Exception as e:
logging.error(f" ❌ Ошибка обработки {hotel_name}: {e}")
self.conn.rollback()
return False
def audit_hotel(self, hotel_id, hotel_name):
"""Запустить аудит отеля через N8N"""
try:
response = requests.post(
N8N_WEBHOOK_URL,
json={'hotel_id': str(hotel_id)},
timeout=300
)
if response.status_code == 200:
result = response.json()
# Сохранить результат
self.save_audit_to_db(hotel_id, result)
logging.info(f" ✅ Аудит завершён")
return True
else:
logging.error(f" ❌ N8N вернул статус {response.status_code}")
return False
except Exception as e:
logging.error(f" ❌ Ошибка аудита {hotel_name}: {e}")
return False
def save_audit_to_db(self, hotel_id, audit_data):
"""Сохранить результаты аудита в БД"""
try:
# Удалить старые результаты
self.cur.execute("""
DELETE FROM hotel_audit_results
WHERE hotel_id = %s AND audit_version = 'v1.0_with_rkn'
""", (hotel_id,))
# Сохранить результаты по критериям
for criterion_id, criterion_data in audit_data.get('audit_results', {}).items():
status = criterion_data.get('status', 'unknown')
ai_agent = criterion_data.get('ai_agent', {})
self.cur.execute("""
INSERT INTO hotel_audit_results
(hotel_id, audit_version, criterion_id, status, ai_agent_data, created_at)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
hotel_id,
'v1.0_with_rkn',
int(criterion_id),
status,
json.dumps(ai_agent),
datetime.now()
))
# Сохранить статус РКН если есть
rkn_status = audit_data.get('rkn_status')
if rkn_status:
status_lower = rkn_status.lower() if rkn_status else None
if status_lower == 'in_registry':
rkn_check_status = 'in_registry'
elif status_lower == 'not_in_registry':
rkn_check_status = 'not_in_registry'
elif status_lower == 'unclear':
rkn_check_status = 'unclear'
else:
rkn_check_status = 'not_checked'
self.cur.execute("""
UPDATE hotel_main
SET rkn_check_status = %s,
rkn_last_check = %s
WHERE id = %s
""", (rkn_check_status, datetime.now(), hotel_id))
self.conn.commit()
except Exception as e:
logging.error(f" ❌ Ошибка сохранения в БД: {e}")
self.conn.rollback()
def run_chunking(self):
"""Запустить чанкинизацию"""
hotels = self.get_hotels_to_chunk()
total = len(hotels)
logging.info(f"🚀 НАЧИНАЕМ ЧАНКИНИЗАЦИЮ ПИТЕРА")
logging.info(f" Отелей к обработке: {total}")
success = 0
failed = 0
for i, hotel in enumerate(hotels, 1):
logging.info(f"📦 [{i}/{total}] {hotel['full_name']}")
if self.process_hotel_chunks(hotel['id'], hotel['full_name']):
success += 1
else:
failed += 1
if i % 10 == 0:
logging.info(f" 📊 Прогресс: {success} успешно, {failed} ошибок")
logging.info(f"\n✅ ЧАНКИНИЗАЦИЯ ЗАВЕРШЕНА")
logging.info(f" Успешно: {success}")
logging.info(f" Ошибок: {failed}")
return success, failed
def run_audit(self):
"""Запустить аудит"""
hotels = self.get_hotels_to_audit()
total = len(hotels)
logging.info(f"\n🔍 НАЧИНАЕМ АУДИТ ПИТЕРА")
logging.info(f" Отелей к обработке: {total}")
success = 0
failed = 0
for i, hotel in enumerate(hotels, 1):
logging.info(f"🏨 [{i}/{total}] {hotel['full_name']}")
if self.audit_hotel(hotel['id'], hotel['full_name']):
success += 1
else:
failed += 1
# Небольшая пауза между запросами
time.sleep(2)
if i % 10 == 0:
logging.info(f" 📊 Прогресс: {success} успешно, {failed} ошибок")
logging.info(f"\n✅ АУДИТ ЗАВЕРШЁН")
logging.info(f" Успешно: {success}")
logging.info(f" Ошибок: {failed}")
return success, failed
def close(self):
self.cur.close()
self.conn.close()
if __name__ == '__main__':
processor = SPBProcessor()
try:
logging.info("=" * 60)
logging.info("🏛️ ДОЖАТИЕ САНКТ-ПЕТЕРБУРГА")
logging.info("=" * 60)
# Этап 1: Чанкинизация
chunk_success, chunk_failed = processor.run_chunking()
# Этап 2: Аудит
audit_success, audit_failed = processor.run_audit()
logging.info("\n" + "=" * 60)
logging.info("🎉 ВСЕ ЭТАПЫ ЗАВЕРШЕНЫ!")
logging.info("=" * 60)
logging.info(f"📦 Чанкинизация: {chunk_success} успешно, {chunk_failed} ошибок")
logging.info(f"🔍 Аудит: {audit_success} успешно, {audit_failed} ошибок")
finally:
processor.close()