2026-02-20 09:31:13 +03:00
|
|
|
|
"""
|
|
|
|
|
|
MAX WebApp (Mini App) auth helper.
|
|
|
|
|
|
|
|
|
|
|
|
Валидация initData от MAX Bridge — тот же алгоритм, что и у Telegram:
|
|
|
|
|
|
secret_key = HMAC_SHA256("WebAppData", BOT_TOKEN), data_check_string без hash, сравнение с hash.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
|
|
import hmac
|
|
|
|
|
|
import json
|
|
|
|
|
|
import logging
|
|
|
|
|
|
from typing import Dict, Any
|
|
|
|
|
|
from urllib.parse import parse_qsl
|
|
|
|
|
|
|
|
|
|
|
|
from ..config import settings
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MaxAuthError(Exception):
|
|
|
|
|
|
"""Ошибка проверки подлинности MAX initData."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_init_data(init_data: str) -> Dict[str, Any]:
|
|
|
|
|
|
"""Разбирает строку initData (query string) в словарь."""
|
|
|
|
|
|
data: Dict[str, Any] = {}
|
|
|
|
|
|
for key, value in parse_qsl(init_data, keep_blank_values=True):
|
|
|
|
|
|
data[key] = value
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-02-27 07:48:16 +03:00
|
|
|
|
def _verify_with_token(parsed: Dict[str, Any], data_check_string: str, received_hash: str, bot_token: str) -> bool:
|
|
|
|
|
|
"""Проверяет подпись initData одним MAX ботом. Возвращает True, если подпись верна."""
|
|
|
|
|
|
secret_key = hmac.new(
|
|
|
|
|
|
key="WebAppData".encode("utf-8"),
|
|
|
|
|
|
msg=bot_token.encode("utf-8"),
|
|
|
|
|
|
digestmod=hashlib.sha256,
|
|
|
|
|
|
).digest()
|
|
|
|
|
|
calculated_hash = hmac.new(
|
|
|
|
|
|
key=secret_key,
|
|
|
|
|
|
msg=data_check_string.encode("utf-8"),
|
|
|
|
|
|
digestmod=hashlib.sha256,
|
|
|
|
|
|
).hexdigest()
|
|
|
|
|
|
return hmac.compare_digest(calculated_hash, received_hash)
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-02-20 09:31:13 +03:00
|
|
|
|
def verify_max_init_data(init_data: str) -> Dict[str, Any]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Проверяет подпись initData по правилам MAX (аналогично Telegram).
|
|
|
|
|
|
|
2026-02-27 07:48:16 +03:00
|
|
|
|
Поддерживает один бот (MAX_BOT_TOKEN) или несколько (MAX_BOT_TOKENS — JSON).
|
|
|
|
|
|
Перебирает токены, пока один не подойдёт; в результат добавляется ключ bot_id.
|
|
|
|
|
|
|
2026-02-20 09:31:13 +03:00
|
|
|
|
- secret_key = HMAC_SHA256("WebAppData", BOT_TOKEN)
|
|
|
|
|
|
- data_check_string: пары key=value без hash, сортировка по key, разделитель \n
|
|
|
|
|
|
- hex(HMAC_SHA256(secret_key, data_check_string)) === hash из initData
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not init_data:
|
|
|
|
|
|
logger.warning("[MAX] verify_max_init_data: init_data пустой")
|
|
|
|
|
|
raise MaxAuthError("init_data is empty")
|
|
|
|
|
|
|
2026-02-27 07:48:16 +03:00
|
|
|
|
tokens_list = settings.get_max_bot_tokens()
|
|
|
|
|
|
if not tokens_list:
|
|
|
|
|
|
logger.warning("[MAX] Ни MAX_BOT_TOKEN, ни MAX_BOT_TOKENS не заданы в .env")
|
2026-02-20 09:31:13 +03:00
|
|
|
|
raise MaxAuthError("MAX bot token is not configured")
|
|
|
|
|
|
|
|
|
|
|
|
parsed = _parse_init_data(init_data)
|
|
|
|
|
|
logger.info("[MAX] initData распарсен, ключи: %s", list(parsed.keys()))
|
|
|
|
|
|
|
|
|
|
|
|
received_hash = parsed.pop("hash", None)
|
|
|
|
|
|
if not received_hash:
|
|
|
|
|
|
logger.warning("[MAX] В initData отсутствует поле hash")
|
|
|
|
|
|
raise MaxAuthError("Missing hash in init_data")
|
|
|
|
|
|
|
2026-02-27 07:48:16 +03:00
|
|
|
|
data_check_items = [f"{k}={parsed[k]}" for k in sorted(parsed.keys())]
|
2026-02-20 09:31:13 +03:00
|
|
|
|
data_check_string = "\n".join(data_check_items)
|
|
|
|
|
|
|
2026-02-27 07:48:16 +03:00
|
|
|
|
for bot_id, bot_token in tokens_list:
|
|
|
|
|
|
if _verify_with_token(parsed, data_check_string, received_hash, bot_token):
|
|
|
|
|
|
parsed["bot_id"] = bot_id
|
|
|
|
|
|
logger.info("[MAX] Подпись MAX initData проверена, bot_id=%s", bot_id)
|
|
|
|
|
|
return parsed
|
2026-02-20 09:31:13 +03:00
|
|
|
|
|
2026-02-27 07:48:16 +03:00
|
|
|
|
logger.warning("[MAX] Подпись initData не совпадает ни с одним из токенов MAX ботов")
|
|
|
|
|
|
raise MaxAuthError("Invalid init_data hash")
|
2026-02-20 09:31:13 +03:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_max_user(init_data: str) -> Dict[str, Any]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Валидирует initData и возвращает данные пользователя MAX.
|
|
|
|
|
|
|
|
|
|
|
|
В поле `user` — JSON с id, first_name, last_name, username, language_code, photo_url и т.д.
|
|
|
|
|
|
"""
|
|
|
|
|
|
parsed = verify_max_init_data(init_data)
|
|
|
|
|
|
|
|
|
|
|
|
user_raw = parsed.get("user")
|
|
|
|
|
|
if not user_raw:
|
|
|
|
|
|
logger.warning("[MAX] В initData отсутствует поле user")
|
|
|
|
|
|
raise MaxAuthError("No user field in init_data")
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
user_obj = json.loads(user_raw)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
raise MaxAuthError(f"Failed to parse user JSON: {e}") from e
|
|
|
|
|
|
|
|
|
|
|
|
if "id" not in user_obj:
|
|
|
|
|
|
raise MaxAuthError("MAX user.id is missing")
|
|
|
|
|
|
|
2026-02-27 07:48:16 +03:00
|
|
|
|
result = {
|
2026-02-20 09:31:13 +03:00
|
|
|
|
"max_user_id": str(user_obj.get("id")),
|
|
|
|
|
|
"username": user_obj.get("username"),
|
|
|
|
|
|
"first_name": user_obj.get("first_name"),
|
|
|
|
|
|
"last_name": user_obj.get("last_name"),
|
|
|
|
|
|
"language_code": user_obj.get("language_code"),
|
|
|
|
|
|
"photo_url": user_obj.get("photo_url"),
|
|
|
|
|
|
"raw": user_obj,
|
|
|
|
|
|
}
|
2026-02-27 07:48:16 +03:00
|
|
|
|
if "bot_id" in parsed:
|
|
|
|
|
|
result["bot_id"] = parsed["bot_id"]
|
|
|
|
|
|
return result
|