Files
crm.clientright.ru/ticket_form/backend/app/services/rabbitmq_service.py

227 lines
7.9 KiB
Python
Raw Normal View History

"""
RabbitMQ Service для асинхронной обработки задач
"""
import aio_pika
from aio_pika import Connection, Channel, Queue, Exchange, Message
from aio_pika.pool import Pool
from typing import Optional, Callable, Dict, Any
import json
import logging
from ..config import settings
logger = logging.getLogger(__name__)
class RabbitMQService:
"""Сервис для работы с RabbitMQ"""
# Названия очередей
QUEUE_OCR_PROCESSING = "erv_ocr_processing"
QUEUE_AI_EXTRACTION = "erv_ai_extraction"
QUEUE_FLIGHT_CHECK = "erv_flight_check"
QUEUE_CRM_INTEGRATION = "erv_crm_integration"
QUEUE_NOTIFICATIONS = "erv_notifications"
def __init__(self):
self.connection: Optional[Connection] = None
self.channel: Optional[Channel] = None
self.queues: Dict[str, Queue] = {}
async def connect(self):
"""Подключение к RabbitMQ"""
try:
self.connection = await aio_pika.connect_robust(
settings.rabbitmq_url,
timeout=30
)
self.channel = await self.connection.channel()
await self.channel.set_qos(prefetch_count=10)
logger.info(f"✅ RabbitMQ connected: {settings.rabbitmq_host}:{settings.rabbitmq_port}")
# Объявляем очереди
await self._declare_queues()
except Exception as e:
logger.error(f"❌ RabbitMQ connection error: {e}")
raise
async def disconnect(self):
"""Отключение от RabbitMQ"""
if self.connection:
await self.connection.close()
logger.info("RabbitMQ connection closed")
async def _declare_queues(self):
"""Объявляем все рабочие очереди"""
queue_names = [
self.QUEUE_OCR_PROCESSING,
self.QUEUE_AI_EXTRACTION,
self.QUEUE_FLIGHT_CHECK,
self.QUEUE_CRM_INTEGRATION,
self.QUEUE_NOTIFICATIONS,
]
for queue_name in queue_names:
queue = await self.channel.declare_queue(
queue_name,
durable=True, # Очередь переживет перезапуск
arguments={
"x-message-ttl": 3600000, # TTL сообщений 1 час
"x-max-length": 10000, # Максимум сообщений в очереди
}
)
self.queues[queue_name] = queue
logger.info(f"✅ Queue declared: {queue_name}")
async def publish(
self,
queue_name: str,
message: Dict[str, Any],
priority: int = 5,
headers: Optional[Dict[str, Any]] = None
):
"""
Публикация сообщения в очередь
Args:
queue_name: Название очереди
message: Данные сообщения (dict)
priority: Приоритет (0-10, где 10 - максимальный)
headers: Дополнительные заголовки
"""
try:
msg_body = json.dumps(message).encode()
msg = Message(
body=msg_body,
priority=priority,
headers=headers or {},
content_type="application/json",
delivery_mode=aio_pika.DeliveryMode.PERSISTENT # Сохранять на диск
)
# Публикуем в default exchange с routing_key = queue_name
await self.channel.default_exchange.publish(
msg,
routing_key=queue_name
)
logger.debug(f"📤 Message published to {queue_name}: {message.get('task_id', 'unknown')}")
except Exception as e:
logger.error(f"❌ Failed to publish message to {queue_name}: {e}")
raise
async def consume(
self,
queue_name: str,
callback: Callable,
prefetch_count: int = 1
):
"""
Подписка на сообщения из очереди
Args:
queue_name: Название очереди
callback: Асинхронная функция-обработчик
prefetch_count: Количество сообщений для одновременной обработки
"""
try:
queue = self.queues.get(queue_name)
if not queue:
logger.error(f"Queue {queue_name} not found")
return
await self.channel.set_qos(prefetch_count=prefetch_count)
await queue.consume(callback)
logger.info(f"👂 Consuming from {queue_name}")
except Exception as e:
logger.error(f"❌ Failed to consume from {queue_name}: {e}")
raise
async def health_check(self) -> bool:
"""Проверка здоровья RabbitMQ"""
try:
if self.connection and not self.connection.is_closed:
return True
return False
except Exception as e:
logger.error(f"RabbitMQ health check failed: {e}")
return False
# ============================================
# ВСПОМОГАТЕЛЬНЫЕ МЕТОДЫ ДЛЯ ЗАДАЧ
# ============================================
async def publish_ocr_task(self, claim_id: str, file_id: str, file_path: str):
"""Отправка задачи на OCR обработку"""
await self.publish(
self.QUEUE_OCR_PROCESSING,
{
"task_type": "ocr_processing",
"claim_id": claim_id,
"file_id": file_id,
"file_path": file_path
},
priority=8
)
async def publish_ai_extraction_task(self, claim_id: str, file_id: str, ocr_text: str):
"""Отправка задачи на AI извлечение данных"""
await self.publish(
self.QUEUE_AI_EXTRACTION,
{
"task_type": "ai_extraction",
"claim_id": claim_id,
"file_id": file_id,
"ocr_text": ocr_text
},
priority=7
)
async def publish_flight_check_task(self, claim_id: str, flight_number: str, flight_date: str):
"""Отправка задачи на проверку рейса"""
await self.publish(
self.QUEUE_FLIGHT_CHECK,
{
"task_type": "flight_check",
"claim_id": claim_id,
"flight_number": flight_number,
"flight_date": flight_date
},
priority=6
)
async def publish_crm_integration_task(self, claim_id: str, form_data: Dict[str, Any]):
"""Отправка задачи на интеграцию с CRM"""
await self.publish(
self.QUEUE_CRM_INTEGRATION,
{
"task_type": "crm_integration",
"claim_id": claim_id,
"form_data": form_data
},
priority=9 # Высокий приоритет
)
async def publish_notification_task(self, claim_id: str, notification_type: str, data: Dict[str, Any]):
"""Отправка задачи на отправку уведомления"""
await self.publish(
self.QUEUE_NOTIFICATIONS,
{
"task_type": "notification",
"claim_id": claim_id,
"notification_type": notification_type,
"data": data
},
priority=5
)
# Глобальный экземпляр
rabbitmq_service = RabbitMQService()