Files
hotels/audit_system_backup.py

1086 lines
50 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Система аудита отелей по 18 критериям
- Автоматически определяет "нет сайта" = все критерии НЕТ
- Для отелей с сайтами: semantic search по критериям
- Сохраняет результаты в PostgreSQL
- Экспортирует в Excel
"""
import psycopg2
from psycopg2.extras import Json
import requests
import json
import logging
from datetime import datetime
from urllib.parse import unquote
from typing import List, Dict, Optional
import pandas as pd
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
DB_CONFIG = {
'host': "147.45.189.234",
'port': 5432,
'database': "default_db",
'user': "gen_user",
'password': unquote("2~~9_%5EkVsU%3F2%5CS")
}
# 18 критериев аудита
AUDIT_CRITERIA = [
{
'id': 1,
'name': 'Юридическая идентификация и верификация',
'query': 'полное наименование организации ОПФ ИНН ОГРН ЕГРЮЛ ЕГРИП проверить',
'keywords': ['инн', 'огрн', 'егрюл', 'егрип', 'организация', 'ооо', 'ип'],
'required_patterns': [
r'\b\d{10}\b', # ИНН юридического лица (10 цифр)
r'\b\d{12}\b', # ИНН ИП (12 цифр)
r'\b\d{13}\b', # ОГРН (13 цифр)
r'\b\d{15}\b', # ОГРНИП (15 цифр)
r'инн\s*:?\s*\d{10,12}', # ИНН: 1234567890 или 123456789012
r'огрн\s*:?\s*\d{13}', # ОГРН: 1234567890123
r'огрнип\s*:?\s*\d{15}', # ОГРНИП: 123456789012345
]
},
{
'id': 2,
'name': 'Адрес',
'query': 'юридический адрес фактический адрес местонахождение',
'keywords': ['адрес', 'address', 'местонахождение', 'г.', 'ул.'],
'priority_patterns': [
r'\d{6}.*?ул\.', # Индекс (689251) + что-то + ул.
r'ул\.\s*[А-Яа-яёЁA-Za-z\s]+,?\s*\d+', # ул. Название, дом
]
},
{
'id': 3,
'name': 'Контакты',
'query': 'телефон email форма обратной связи чат контакты',
'keywords': ['телефон', 'phone', 'email', '@', '+7', '8-800'],
'priority_patterns': [
r'(?:\+7|8)\s*\(?\d{3,5}\)?\s*\d{1,3}[-\s]?\d{2}[-\s]?\d{2}', # Телефон: +7(xxx)xxx-xx-xx или 8(xxx)xxx-xx-xx
r'[\w\.-]+@[\w\.-]+\.\w{2,}', # Email: name@domain.com
]
},
{
'id': 4,
'name': 'Режим работы',
'query': 'часы работы график приема режим работы колл-центр',
'keywords': ['часы работы', 'график работы', 'режим работы', 'работаем', 'круглосуточно', '24/7', 'пн-пт', 'пн-вс'],
'priority_patterns': [
r'(?:с|с\s+)\d{1,2}(?::|\.)\d{2}\s*(?:до|по)\s*\d{1,2}(?::|\.)\d{2}', # с 9:00 до 18:00
r'(?:с|с\s+)\d{1,2}\s+(?:до|по)\s+\d{1,2}(?:\s+час)', # с 9 до 18 часов (с обязательным "час")
r'круглосуточно', # круглосуточно
r'24\s*[/\-]\s*7', # 24/7
]
},
{
'id': 5,
'name': 'Политика ПДн (152-ФЗ)',
'query': 'политика персональных данных обработка ПДн 152-ФЗ',
'keywords': ['персональных данных', 'пдн', '152-фз', 'privacy'],
'priority_patterns': [
r'политика\s+в\s+отношении\s+обработки\s+персональных\s+данных', # Точное название документа
r'152[-\s]?фз', # 152-ФЗ
r'федеральный\s+закон.*?персональных\s+данных', # Федеральный закон о персональных данных
]
},
{
'id': 6,
'name': 'Роскомнадзор (реестр)',
'query': 'реестр операторов персональных данных Роскомнадзор',
'keywords': ['роскомнадзор', 'реестр оператор']
},
{
'id': 7,
'name': 'Договор-оферта / Правила оказания услуг',
'query': 'публичная оферта договор пользовательское соглашение условия оказания услуг правила проживания бронирования размещения посещения',
'keywords': [
# Договор/оферта
'публичная оферта', 'публичный договор', 'договор оказания услуг', 'договор на оказание',
'пользовательское соглашение', 'условия договора',
# Правила/условия (специфичные для отелей)
'условия проживания', 'правила проживания', 'условия размещения', 'правила размещения',
'условия бронирования', 'правила отмены', 'условия оказания услуг', 'входит в стоимость',
'правила оказания услуг', 'порядок оказания услуг',
# Универсальные (для любых организаций)
'правила посещения', 'правила использования', 'правила и условия',
'terms and conditions', 'terms of service', 'terms of use'
],
'priority_patterns': [
r'правила\s+посещения\s+(?:парка|территории|объекта)', # "Правила посещения" как заголовок (не в меню)
r'публичная\s+оферта', # Публичная оферта
r'договор.*?оказани.*?услуг', # Договор оказания услуг
]
},
{
'id': 8,
'name': 'Рекламации и споры',
'query': 'претензия рекламация споры возврат обмен гарантия жалоба',
'keywords': ['претензия', 'претензии', 'рекламация', 'рекламации', 'возврат средств', 'возврат денег', 'порядок рассмотрения споров', 'досудебный порядок', 'жалоба', 'жалобы', 'книга жалоб']
},
{
'id': 9,
'name': 'Цены/прайс',
'query': 'цены прайс стоимость тарифы',
'keywords': ['цен', 'руб', '', 'price', 'стоимость', 'тариф', 'платн'],
'priority_urls': ['price', 'prices', 'ceny', 'tarif', 'tariff', 'stoimost'] # Приоритетные URL для поиска
},
{
'id': 10,
'name': 'Способы оплаты',
'query': 'способы оплаты наличные карта СБП оплата банковская карта',
'keywords': ['способы оплаты', 'методы оплаты', 'оплата картой', 'банковская карта', 'банковские карты', 'наличные', 'наличными', 'сбп', 'qr-код', 'visa', 'mastercard', 'мир']
},
{
'id': 11,
'name': 'Онлайн-оплата',
'query': 'онлайн оплата эквайринг оплатить online payment',
'keywords': ['онлайн оплат', 'эквайринг', 'оплатить']
},
{
'id': 12,
'name': 'Онлайн-бронирование',
'query': 'онлайн бронирование забронировать booking',
'keywords': ['бронирован', 'забронировать', 'booking']
},
{
'id': 13,
'name': 'FAQ',
'query': 'FAQ частые вопросы вопрос-ответ часто задаваемые',
'keywords': ['faq', 'частые вопросы', 'часто задаваемые вопросы', 'часто задаваемые', 'вопрос-ответ', 'вопросы и ответы'],
'priority_urls': ['faq', 'chasto-zadavaemye', 'voprosy', 'questions']
},
{
'id': 14,
'name': 'Доступность для ЛОВЗ',
'query': 'доступность инвалиды ЛОВЗ безбарьерная среда маломобильные',
'keywords': ['для инвалидов', 'для лиц с ограниченными возможностями', 'ловз', 'доступная среда', 'безбарьерная среда', 'маломобильные граждане', 'пандус', 'поручни', 'доступность для инвалидов', 'специальные условия']
},
{
'id': 15,
'name': 'Партнёры/бренды',
'query': 'партнеры поставщики бренды сотрудничество',
'keywords': ['партнер', 'поставщик', 'бренд'],
'priority_urls': ['partner', 'partners', 'partnery', 'brand', 'brands', 'sotrudnichestvo']
},
{
'id': 16,
'name': 'Команда/сотрудники',
'query': 'команда сотрудники персонал руководство',
'keywords': ['команда', 'сотрудник', 'персонал', 'руководство'],
'priority_urls': ['sotrudniki', 'staff', 'team', 'komanda', 'personal', 'about-us', 'o-nas']
},
{
'id': 17,
'name': 'Уголок потребителя',
'query': 'уголок потребителя права закон защита потребителей',
'keywords': ['уголок потребител', 'права потребител', 'защита потребител']
},
{
'id': 18,
'name': 'Актуальность документов',
'query': 'дата обновления дата публикации актуально версия',
'keywords': ['дата обновления', 'дата публикации', 'обновлено', 'опубликовано', 'актуально на', 'версия от', 'дата создания', 'последнее обновление']
}
]
class AuditSystem:
def __init__(self, region_name: str, group_id: str):
self.region_name = region_name
self.group_id = group_id
self.conn = None
def connect_db(self):
self.conn = psycopg2.connect(**DB_CONFIG)
def create_audit_table(self):
"""Создать таблицу для результатов аудита"""
cur = self.conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS hotel_audit_results (
id SERIAL PRIMARY KEY,
hotel_id UUID REFERENCES hotel_main(id),
region_name TEXT,
hotel_name TEXT,
website TEXT,
has_website BOOLEAN,
-- Результаты по 18 критериям
criteria_results JSONB,
-- Общие метрики
total_score INTEGER, -- Сумма найденных критериев
max_score INTEGER DEFAULT 18,
score_percentage FLOAT,
-- Мета
audit_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
audit_version TEXT,
UNIQUE(hotel_id, audit_version)
);
CREATE INDEX IF NOT EXISTS idx_audit_region ON hotel_audit_results(region_name);
CREATE INDEX IF NOT EXISTS idx_audit_score ON hotel_audit_results(total_score);
""")
self.conn.commit()
cur.close()
logger.info("✓ Таблица hotel_audit_results готова")
def get_hotels_for_audit(self) -> List[Dict]:
"""Получить отели региона для аудита"""
cur = self.conn.cursor()
cur.execute("""
SELECT
m.id,
m.full_name,
r.main_data->>'websiteAddress' as website
FROM hotel_main m
LEFT JOIN hotel_raw_json r ON m.id = r.hotel_id
WHERE m.region_name = %s
AND r.hotel_id IS NOT NULL
ORDER BY
CASE WHEN r.main_data->>'websiteAddress' IS NOT NULL THEN 0 ELSE 1 END,
m.full_name;
""", (self.region_name,))
hotels = []
for row in cur.fetchall():
hotels.append({
'id': str(row[0]),
'name': row[1],
'website': row[2],
'has_website': bool(row[2])
})
cur.close()
return hotels
def audit_hotel_no_website(self, hotel: Dict) -> Dict:
"""Аудит отеля БЕЗ сайта - все критерии автоматом НЕТ"""
results = {}
for criterion in AUDIT_CRITERIA:
results[f"criterion_{criterion['id']:02d}"] = {
'name': criterion['name'],
'found': False,
'reason': 'Нет сайта',
'quote': None,
'score': 0.0,
'verdict': 'НЕТ'
}
return {
'has_website': False,
'criteria_results': results,
'total_score': 0,
'max_score': 18,
'score_percentage': 0.0
}
def audit_hotel_with_website(self, hotel: Dict) -> Dict:
"""Аудит отеля С сайтом - semantic search"""
logger.info(f"🔍 Аудит: {hotel['name'][:50]}")
results = {}
total_score = 0
for criterion in AUDIT_CRITERIA:
logger.info(f" Критерий {criterion['id']}: {criterion['name']}")
try:
# Специальная проверка для критерия 6 (Роскомнадзор)
if criterion['id'] == 6:
result = self.check_rkn_registry(hotel['id'], criterion)
else:
# Semantic search через Neo4j
response = requests.post(
'http://localhost:9200/upload', # используем для проверки
timeout=5
)
# Упрощённо: проверяем по keywords в сохранённом тексте
# (для production нужен полноценный semantic search)
result = self.check_criterion_simple(hotel['id'], criterion)
results[f"criterion_{criterion['id']:02d}"] = result
# Считаем балл только если verdict = ДА (не ЧАСТИЧНО и не НЕТ)
if result.get('verdict') == 'ДА':
total_score += 1
except Exception as e:
logger.error(f" Ошибка: {e}")
results[f"criterion_{criterion['id']:02d}"] = {
'name': criterion['name'],
'found': False,
'reason': f'Ошибка поиска: {str(e)[:100]}',
'quote': None,
'score': 0.0,
'verdict': 'ОШИБКА'
}
return {
'has_website': True,
'criteria_results': results,
'total_score': total_score,
'max_score': 18,
'score_percentage': total_score / 18 * 100
}
def check_rkn_registry(self, hotel_id: str, criterion: Dict) -> Dict:
"""Проверка регистрации в реестре Роскомнадзора (из автоматической проверки)"""
cur = self.conn.cursor()
# Получаем данные из hotel_main
cur.execute("""
SELECT rkn_registry_status, rkn_registry_number, rkn_registry_date
FROM hotel_main
WHERE id = %s
""", (hotel_id,))
result = cur.fetchone()
cur.close()
if not result:
return {
'name': criterion['name'],
'found': False,
'keywords_found': [],
'patterns_found': [],
'confidence': 0.0,
'quote': 'Нет данных о проверке в РКН',
'approval_urls': [],
'approval_quotes': [],
'score': 0.0,
'verdict': 'НЕТ'
}
rkn_status, rkn_number, rkn_date = result
# Определяем вердикт на основе статуса
if rkn_status == 'found':
found = True
confidence = 1.0
verdict = 'ДА'
quote = f"ИНН найден в реестре РКН. Номер: {rkn_number}, дата: {rkn_date}"
approval_url = f"https://pd.rkn.gov.ru/operators-registry/operators-list/ (Найдено: {rkn_number})"
elif rkn_status == 'not_found':
found = False
confidence = 0.0
verdict = 'НЕТ'
quote = 'ИНН НЕ найден в реестре операторов ПДн Роскомнадзора'
approval_url = 'https://pd.rkn.gov.ru/operators-registry/operators-list/ (ИНН не найден)'
else: # unclear или None
found = False
confidence = 0.0
verdict = 'НЕТ'
quote = f'Статус проверки РКН: {rkn_status or "не проверено"}'
approval_url = f'https://pd.rkn.gov.ru/operators-registry/operators-list/ (Статус: {rkn_status or "не проверено"})'
return {
'name': criterion['name'],
'found': found,
'keywords_found': [],
'patterns_found': [],
'confidence': confidence,
'quote': quote,
'approval_urls': [approval_url], # Всегда показываем результат проверки
'approval_quotes': [{'url': 'РКН реестр', 'quote': quote, 'keyword': 'Автоматическая проверка'}],
'score': confidence,
'verdict': verdict
}
def check_document_freshness(self, hotel_id: str, criterion: Dict) -> Dict:
"""Проверка актуальности документов через Last-Modified заголовок"""
from datetime import datetime, timezone
cur = self.conn.cursor()
# Получаем last_modified для всех страниц
cur.execute("""
SELECT r.url, r.last_modified, r.crawled_at
FROM hotel_website_raw r
WHERE r.hotel_id = %s AND r.last_modified IS NOT NULL
ORDER BY r.last_modified DESC
LIMIT 5
""", (hotel_id,))
pages_with_dates = cur.fetchall()
cur.close()
if not pages_with_dates:
# Нет Last-Modified заголовков - пробуем текстовый поиск
return self.check_criterion_text_dates(hotel_id, criterion)
# Берем самую свежую дату обновления
latest_url, latest_modified, crawled_at = pages_with_dates[0]
# Считаем разницу в днях
now = datetime.now(timezone.utc)
# Если last_modified без timezone, добавляем UTC
if latest_modified.tzinfo is None:
latest_modified = latest_modified.replace(tzinfo=timezone.utc)
days_diff = (now - latest_modified).days
# Определяем вердикт
if days_diff < 180: # < 6 месяцев
found = True
confidence = 1.0
verdict = "ДА"
quote = f"Последнее обновление: {latest_modified.strftime('%d.%m.%Y')} ({days_diff} дней назад)"
elif days_diff < 365: # 6-12 месяцев
found = True
confidence = 0.5
verdict = "ЧАСТИЧНО"
quote = f"Обновление: {latest_modified.strftime('%d.%m.%Y')} ({days_diff} дней назад) - требует проверки"
else: # > 12 месяцев
found = False
confidence = 0.0
verdict = "НЕТ"
quote = f"Устарело: последнее обновление {latest_modified.strftime('%d.%m.%Y')} ({days_diff} дней назад)"
# Собираем approval URLs со всеми датами
approval_urls = []
approval_quotes = []
for url, modified, _ in pages_with_dates:
approval_urls.append(url)
if modified.tzinfo is None:
modified = modified.replace(tzinfo=timezone.utc)
days = (now - modified).days
approval_quotes.append({
'url': url,
'quote': f"Last-Modified: {modified.strftime('%d.%m.%Y')} ({days} дн. назад)",
'keyword': 'HTTP Last-Modified'
})
return {
'name': criterion['name'],
'found': found,
'keywords_found': ['Last-Modified (HTTP заголовок)'],
'patterns_found': [],
'confidence': confidence,
'quote': quote,
'approval_urls': approval_urls,
'approval_quotes': approval_quotes,
'score': confidence,
'verdict': verdict
}
def check_criterion_text_dates(self, hotel_id: str, criterion: Dict) -> Dict:
"""Запасной вариант: проверка дат в тексте страницы"""
cur = self.conn.cursor()
# Ищем текстовые упоминания дат
cur.execute("""
SELECT cleaned_text, url
FROM hotel_website_processed
WHERE hotel_id = %s
ORDER BY id
""", (hotel_id,))
all_text = ""
approval_urls = []
for row in cur.fetchall():
text, url = row
page_text = text.lower()
# Проверяем по keywords
for keyword in criterion['keywords']:
if keyword.lower() in page_text:
if url not in approval_urls:
approval_urls.append(url)
cur.close()
# Если нашли текстовые упоминания - частично
if approval_urls:
return {
'name': criterion['name'],
'found': True,
'keywords_found': criterion['keywords'],
'patterns_found': [],
'confidence': 0.5,
'quote': 'Найдены текстовые упоминания дат (без HTTP Last-Modified)',
'approval_urls': approval_urls,
'approval_quotes': [],
'score': 0.5,
'verdict': 'ЧАСТИЧНО'
}
else:
return {
'name': criterion['name'],
'found': False,
'keywords_found': [],
'patterns_found': [],
'confidence': 0.0,
'quote': 'Нет информации о датах обновления',
'approval_urls': [],
'approval_quotes': [],
'score': 0.0,
'verdict': 'НЕТ'
}
def check_criterion_simple(self, hotel_id: str, criterion: Dict) -> Dict:
"""Упрощённая проверка критерия по keywords с сохранением апрувов"""
cur = self.conn.cursor()
# СПЕЦИАЛЬНАЯ ПРОВЕРКА ДЛЯ КРИТЕРИЯ 18 (Актуальность документов)
if criterion['id'] == 18:
return self.check_document_freshness(hotel_id, criterion)
# Ищем в обработанном тексте с URL
cur.execute("""
SELECT cleaned_text, extracted_data, url
FROM hotel_website_processed
WHERE hotel_id = %s
ORDER BY id
""", (hotel_id,))
all_text = ""
all_data = {}
approval_urls = [] # Список URL где найдена информация
for row in cur.fetchall():
text, data, url = row
page_text = text.lower()
all_text += " " + page_text
if data:
for key, value in data.items():
if key not in all_data:
all_data[key] = []
if isinstance(value, list):
all_data[key].extend(value)
cur.close()
# Проверяем по keywords и регулярным выражениям
found_keywords = []
found_patterns = []
found_priority_patterns = []
approval_quotes = [] # Цитаты с URL где найдена информация
import re
# ПРИОРИТЕТ 1: Проверяем priority_patterns (для адресов, ИНН/ОГРН)
if criterion.get('priority_patterns'):
for pattern in criterion['priority_patterns']:
if re.search(pattern, all_text, re.IGNORECASE):
found_priority_patterns.append(pattern)
# Проверяем keywords (точные слова, не подстроки)
for keyword in criterion['keywords']:
# Используем регулярное выражение для поиска точных слов
pattern = r'\b' + re.escape(keyword.lower()) + r'\b'
if re.search(pattern, all_text, re.IGNORECASE):
found_keywords.append(keyword)
# Проверяем регулярные выражения (для точного поиска ИНН/ОГРН)
if criterion.get('required_patterns'):
for pattern in criterion['required_patterns']:
if re.search(pattern, all_text, re.IGNORECASE):
found_patterns.append(pattern)
# Ищем на каких страницах найдена информация
cur = self.conn.cursor()
# СПЕЦИАЛЬНАЯ ЛОГИКА ДЛЯ КРИТЕРИЯ 16: приоритет страницам сотрудников (В НАЧАЛЕ!)
if criterion['id'] == 16 and found_keywords and not approval_urls:
priority_urls = ['sotrudniki', 'staff', 'team', 'komanda', 'personal', 'about-us', 'o-nas']
for priority_path in priority_urls:
cur.execute("""
SELECT cleaned_text, url
FROM hotel_website_processed
WHERE hotel_id = %s AND url LIKE %s
LIMIT 1
""", (hotel_id, f'%{priority_path}%'))
result = cur.fetchone()
if result:
text_row, url_row = result
# Проверяем что на этой странице есть хоть одно ключевое слово
text_lower = text_row.lower()
for kw in found_keywords:
pattern = r'\b' + re.escape(kw.lower()) + r'\b'
if re.search(pattern, text_lower, re.IGNORECASE):
approval_urls.append(url_row)
# Ищем точное вхождение ключевого слова
match = re.search(r'\b' + re.escape(kw.lower()) + r'\b', text_lower, re.IGNORECASE)
if match:
idx = match.start()
start = max(0, idx - 100)
end = min(len(text_row), idx + 200)
quote_text = text_row[start:end].strip()
else:
quote_text = text_row[:200] + "..."
approval_quotes.append({
'url': url_row,
'quote': quote_text,
'keyword': kw
})
break
if approval_urls:
break # Нашли на странице сотрудников - хватит
# СПЕЦИАЛЬНАЯ ЛОГИКА: приоритет URL для поиска апрувов
# Используется для критериев с priority_urls (цены, правила, FAQ и т.д.)
if criterion.get('priority_urls') and len(found_keywords) > 0:
priority_urls = criterion['priority_urls']
elif criterion['id'] == 7 and len(found_keywords) > 0:
# Для критерия 7 (Договор/Правила) - своя логика
priority_urls = ['pravila', 'rules', 'oferta', 'offer', 'terms', 'agreement', 'soglashenie', 'dogovor', 'contract']
elif criterion['id'] == 16 and len(found_keywords) > 0:
# Для критерия 16 (Команда/сотрудники) - приоритет страницам сотрудников
priority_urls = ['sotrudniki', 'staff', 'team', 'komanda', 'personal', 'about-us', 'o-nas']
else:
priority_urls = None
if priority_urls:
for priority_path in priority_urls:
cur.execute("""
SELECT cleaned_text, url
FROM hotel_website_processed
WHERE hotel_id = %s AND url LIKE %s
LIMIT 1
""", (hotel_id, f'%{priority_path}%'))
result = cur.fetchone()
if result:
text_row, url_row = result
# Проверяем что на этой странице есть хоть одно ключевое слово (точные слова)
text_lower = text_row.lower()
for kw in found_keywords:
pattern = r'\b' + re.escape(kw.lower()) + r'\b'
if re.search(pattern, text_lower, re.IGNORECASE):
approval_urls.append(url_row)
# Ищем точное вхождение ключевого слова
match = re.search(r'\b' + re.escape(kw.lower()) + r'\b', text_lower, re.IGNORECASE)
if match:
idx = match.start()
start = max(0, idx - 100)
end = min(len(text_row), idx + 200)
quote_text = text_row[start:end].strip()
else:
quote_text = text_row[:200] + "..."
approval_quotes.append({
'url': url_row,
'quote': quote_text,
'keyword': kw
})
break
if approval_urls:
break # Нашли на приоритетной странице - хватит
# ПРИОРИТЕТ: Сначала ищем по priority_patterns (более специфичные)
# НО для критерия 16 (Команда/сотрудники) приоритетные URL важнее
if found_priority_patterns and not approval_urls and criterion['id'] != 16:
for pattern in found_priority_patterns:
cur.execute("""
SELECT cleaned_text, url
FROM hotel_website_processed
WHERE hotel_id = %s
""", (hotel_id,))
for text_row, url_row in cur.fetchall():
match = re.search(pattern, text_row, re.IGNORECASE)
if match:
if url_row not in approval_urls:
approval_urls.append(url_row)
# Извлекаем цитату с найденным паттерном
start = max(0, match.start() - 100)
end = min(len(text_row), match.end() + 200)
quote_text = text_row[start:end].strip()
approval_quotes.append({
'url': url_row,
'quote': quote_text,
'keyword': f"АДРЕС: {match.group()[:50]}"
})
break # Берем только первое совпадение
if approval_urls:
break # Нашли - хватит
# СПЕЦИАЛЬНАЯ ЛОГИКА ДЛЯ КРИТЕРИЯ 16 уже выполнена в начале функции
# Если не нашли по priority_patterns - ищем по keywords
if not approval_urls:
for keyword in found_keywords:
# Используем регулярное выражение PostgreSQL для точного поиска слов
cur.execute("""
SELECT cleaned_text, url
FROM hotel_website_processed
WHERE hotel_id = %s AND LOWER(cleaned_text) ~ %s
""", (hotel_id, r'\y' + keyword.lower() + r'\y'))
for text_row, url_row in cur.fetchall():
if url_row not in approval_urls:
approval_urls.append(url_row)
# Извлекаем цитату с контекстом (точный поиск слова)
pattern = r'\b' + re.escape(keyword.lower()) + r'\b'
match = re.search(pattern, text_row.lower(), re.IGNORECASE)
if match:
idx = match.start()
start = max(0, idx - 100)
end = min(len(text_row), idx + 200)
quote_text = text_row[start:end].strip()
approval_quotes.append({
'url': url_row,
'quote': quote_text,
'keyword': keyword
})
# Поиск по регулярным выражениям
if found_patterns:
import re
for pattern in found_patterns:
cur.execute("""
SELECT cleaned_text, url
FROM hotel_website_processed
WHERE hotel_id = %s
""", (hotel_id,))
for text_row, url_row in cur.fetchall():
if re.search(pattern, text_row, re.IGNORECASE):
if url_row not in approval_urls:
approval_urls.append(url_row)
# Извлекаем цитату с найденным паттерном
match = re.search(pattern, text_row, re.IGNORECASE)
if match:
start = max(0, match.start() - 100)
end = min(len(text_row), match.end() + 200)
quote_text = text_row[start:end].strip()
approval_quotes.append({
'url': url_row,
'quote': quote_text,
'keyword': f"ПАТТЕРН: {match.group()}"
})
cur.close()
# Определяем результат на основе keywords и паттернов
found = len(found_keywords) > 0 or len(found_patterns) > 0
# Для критерия "Юридическая идентификация" требуем ТОЛЬКО конкретные ИНН/ОГРН
if criterion['id'] == 1: # Юр. идентификация
if found_patterns: # Найдены ИНН/ОГРН по паттернам
confidence = 1.0
verdict = "ДА"
else:
# Только если НЕТ паттернов - ставим НЕТ, даже если есть keywords
confidence = 0.0
verdict = "НЕТ"
elif criterion['id'] == 7: # Договор-оферта / Правила оказания услуг
# Для критерия 7: если нашли хоть ОДНО ключевое слово → ДА
# (критерий объединенный и универсальный, поэтому порог низкий)
if len(found_keywords) > 0 or len(found_priority_patterns) > 0:
confidence = 1.0
verdict = "ДА"
else:
confidence = 0.0
verdict = "НЕТ"
else:
# Для остальных критериев - обычная логика
confidence = len(found_keywords) / len(criterion['keywords']) if len(criterion['keywords']) > 0 else 0.0
verdict = "ДА" if confidence > 0.3 else "ЧАСТИЧНО" if confidence > 0.1 else "НЕТ"
# Извлекаем релевантный фрагмент (первый найденный)
quote = None
if approval_quotes:
quote = approval_quotes[0]['quote']
return {
'name': criterion['name'],
'found': found,
'keywords_found': found_keywords,
'patterns_found': found_patterns, # Найденные паттерны ИНН/ОГРН
'confidence': confidence,
'quote': quote,
'approval_urls': approval_urls, # URL где найдена информация
'approval_quotes': approval_quotes, # Цитаты с URL
'score': confidence,
'verdict': verdict
}
def save_audit_result(self, hotel: Dict, audit_result: Dict):
"""Сохранить результат аудита"""
cur = self.conn.cursor()
cur.execute("""
INSERT INTO hotel_audit_results
(hotel_id, region_name, hotel_name, website, has_website,
criteria_results, total_score, score_percentage, audit_version)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (hotel_id, audit_version) DO UPDATE SET
criteria_results = EXCLUDED.criteria_results,
total_score = EXCLUDED.total_score,
score_percentage = EXCLUDED.score_percentage,
audit_date = CURRENT_TIMESTAMP
""", (
hotel['id'],
self.region_name,
hotel['name'],
hotel.get('website'),
audit_result['has_website'],
Json(audit_result['criteria_results']),
audit_result['total_score'],
audit_result['score_percentage'],
'v1.0'
))
self.conn.commit()
cur.close()
def run_audit(self):
"""Запустить аудит региона"""
self.connect_db()
self.create_audit_table()
logger.info(f"🚀 Запуск аудита: {self.region_name}")
# Получаем отели
hotels = self.get_hotels_for_audit()
total = len(hotels)
with_sites = sum(1 for h in hotels if h['has_website'])
without_sites = total - with_sites
logger.info(f"📊 Отелей: {total}")
logger.info(f" С сайтами: {with_sites} ({with_sites/total*100:.1f}%)")
logger.info(f" БЕЗ сайтов: {without_sites} ({without_sites/total*100:.1f}%)")
logger.info("")
# Аудит каждого отеля
for idx, hotel in enumerate(hotels, 1):
logger.info(f"[{idx}/{total}] {hotel['name'][:50]}")
if hotel['has_website']:
audit_result = self.audit_hotel_with_website(hotel)
logger.info(f" ✅ Сайт есть: {audit_result['total_score']}/18 критериев ({audit_result['score_percentage']:.0f}%)")
else:
audit_result = self.audit_hotel_no_website(hotel)
logger.info(f" ❌ Сайта нет: 0/18 (автоматически)")
# Сохраняем
self.save_audit_result(hotel, audit_result)
logger.info(f"\n✅ Аудит завершён!")
self.generate_report()
def generate_report(self):
"""Сгенерировать отчёт"""
cur = self.conn.cursor()
# Статистика
cur.execute("""
SELECT
count(*) as total,
count(*) FILTER (WHERE has_website) as with_websites,
avg(total_score) as avg_score,
avg(score_percentage) as avg_percentage
FROM hotel_audit_results
WHERE region_name = %s;
""", (self.region_name,))
total, with_sites, avg_score, avg_pct = cur.fetchone()
without_sites = total - (with_sites or 0)
print("\n" + "=" * 70)
print(f"📊 ИТОГОВЫЙ ОТЧЁТ: {self.region_name.upper()}")
print("=" * 70)
print(f"\n📈 ОБЩАЯ СТАТИСТИКА:")
print(f" Всего отелей проверено: {total}")
print(f" С сайтами: {with_sites or 0} ({(with_sites or 0)/total*100:.1f}%)")
print(f" БЕЗ сайтов: {without_sites} ({without_sites/total*100:.1f}%)")
print(f" Средний балл: {avg_score:.1f}/18 ({avg_pct:.1f}%)")
print(f"\n⚠️ ВАЖНО:")
print(f" Для {without_sites} отелей без сайтов")
print(f" автоматически установлено \"НЕТ\" по всем 18 критериям")
# Детальные результаты
print(f"\n🏨 РЕЗУЛЬТАТЫ ПО ОТЕЛЯМ:")
cur.execute("""
SELECT hotel_name, website, has_website, total_score
FROM hotel_audit_results
WHERE region_name = %s
ORDER BY has_website DESC, total_score DESC;
""", (self.region_name,))
for name, website, has_site, score in cur.fetchall():
site_mark = "🌐" if has_site else ""
print(f"\n {site_mark} {name[:55]}")
if has_site:
print(f" Сайт: {website}")
print(f" Балл: {score}/18")
else:
print(f" Сайта нет → 0/18 баллов")
print("\n" + "=" * 70)
# Экспорт в Excel
self.export_to_excel()
cur.close()
def export_to_excel(self):
"""Экспорт в Excel"""
cur = self.conn.cursor()
cur.execute("""
SELECT
hotel_name,
website,
has_website,
total_score,
score_percentage,
criteria_results,
audit_date
FROM hotel_audit_results
WHERE region_name = %s
ORDER BY has_website DESC, total_score DESC;
""", (self.region_name,))
rows = []
for row in cur.fetchall():
name, website, has_site, score, pct, criteria, date = row
row_data = {
'Отель': name,
'Сайт': website if has_site else 'НЕТ САЙТА',
'Есть сайт': 'Да' if has_site else 'Нет',
'Балл': score,
'Процент': f"{pct:.1f}%"
}
# Добавляем результаты по каждому критерию (3 колонки на критерий)
if criteria:
for i in range(1, 19):
key = f"criterion_{i:02d}"
if key in criteria:
result = criteria[key]
# КОЛОНКА 1: Результат (ДА/НЕТ/ЧАСТИЧНО)
row_data[f"{i}. {result['name']}"] = result['verdict']
# КОЛОНКА 2: Апрув URL (первая ссылка как гиперссылка)
if result.get('approval_urls') and len(result['approval_urls']) > 0:
first_url = result['approval_urls'][0]
# Если несколько URL - показываем количество
if len(result['approval_urls']) > 1:
url_text = f"{first_url} (+{len(result['approval_urls'])-1})"
else:
url_text = first_url
row_data[f"{i}. Апрув URL"] = url_text
else:
row_data[f"{i}. Апрув URL"] = "-"
# КОЛОНКА 3: Пояснение (краткая цитата, до 150 символов)
explanation = ""
if result.get('quote'):
# Обрезаем цитату до 150 символов
quote = result['quote']
if len(quote) > 150:
quote = quote[:147] + "..."
explanation = quote
elif result['verdict'] == 'НЕТ':
explanation = "Не найдено"
else:
explanation = "Найдено"
row_data[f"{i}. Пояснение"] = explanation
rows.append(row_data)
# Создаём Excel с гиперссылками
df = pd.DataFrame(rows)
filename = f"audit_{self.region_name.replace(' ', '_')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
# Используем openpyxl для создания гиперссылок
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import Font, Alignment, PatternFill
wb = Workbook()
ws = wb.active
ws.title = "Аудит"
# Записываем данные
for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), 1):
for c_idx, value in enumerate(row, 1):
cell = ws.cell(row=r_idx, column=c_idx, value=value)
# Форматирование заголовков
if r_idx == 1:
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="CCE5FF", end_color="CCE5FF", fill_type="solid")
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
# Создаём гиперссылки для колонок с URL
for r_idx in range(2, ws.max_row + 1): # Пропускаем заголовок
for c_idx, col_name in enumerate(df.columns, 1):
if 'Апрув URL' in col_name:
cell = ws.cell(row=r_idx, column=c_idx)
url_text = cell.value
if url_text and url_text != "-" and isinstance(url_text, str):
# Извлекаем первый URL (убираем "(+N)")
url = url_text.split(' (+')[0].strip()
# Проверяем что это валидный URL
if url.startswith('http'):
# Создаём гиперссылку
cell.hyperlink = url
cell.font = Font(color="0563C1", underline="single")
cell.value = url # Показываем только URL без "(+N)"
# Автоширина колонок
for column in ws.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if cell.value:
max_length = max(max_length, len(str(cell.value)))
except:
pass
adjusted_width = min(max_length + 2, 60) # Макс 60 символов
ws.column_dimensions[column_letter].width = adjusted_width
wb.save(filename)
logger.info(f"📊 Экспортировано в: {filename}")
cur.close()
if __name__ == "__main__":
import sys
region = sys.argv[1] if len(sys.argv) > 1 else "Чукотский автономный округ"
group_id = sys.argv[2] if len(sys.argv) > 2 else "hotel_chukotka"
auditor = AuditSystem(region, group_id)
auditor.run_audit()