import asyncio import json import logging from typing import Optional from aio_pika import connect_robust, Message, DeliveryMode from aio_pika.abc import AbstractIncomingMessage from telebot import apihelper from app.bot.services.mailing_service.composer import compose_telegram_message from app.bot.services.mailing_service.db_utils import get_recipients_by_region from app.bot.services.mailing_service.parser import parse_region_id from app.bot.services.mailing_service.utils.link_button import create_link_button from app.bot.services.mailing_service.utils.tg_get_user import get_telegram_id_by_chat_id from config import RABBITMQ_URL_FULL, RABBITMQ_QUEUE, RABBITMQ_NOTIFICATIONS_QUEUE, MAILING_RATE_LIMIT logger = logging.getLogger("TeleBot") rate_limit_semaphore = asyncio.Semaphore(MAILING_RATE_LIMIT) class AsyncMailingService: def __init__(self, flask_app, bot): self.flask_app = flask_app self.bot = bot self.loop = asyncio.get_event_loop() async def start(self): await asyncio.gather( self.consume_raw_messages(), self.consume_notifications() ) async def consume_raw_messages(self): while True: try: logger.info("[MailingService][raw] Подключение к RabbitMQ (сырые сообщения)...") connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=10) raw_queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True) notifications_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True) logger.info("[MailingService][raw] Ожидание сообщений из очереди...") async with raw_queue.iterator() as queue_iter: async for message in queue_iter: await self.handle_raw_message(message, channel, notifications_queue) except Exception as e: logger.error(f"[MailingService][raw] Ошибка подключения или обработки: {e}") logger.info("[MailingService][raw] Повторное подключение через 5 секунд...") await asyncio.sleep(5) async def handle_raw_message(self, message: AbstractIncomingMessage, channel, notifications_queue): async with message.process(): with self.flask_app.app_context(): try: data = json.loads(message.body.decode("utf-8")) logger.info(f"[MailingService][raw] Получены данные: {json.dumps(data, ensure_ascii=False)}") region_id = int(parse_region_id(data.get("hostgroups"))) severity = data.get("severity", "") # Получатели и сообщение параллельно recipients_task = asyncio.to_thread(get_recipients_by_region, self.flask_app, region_id, severity) message_task = asyncio.to_thread(compose_telegram_message, data) recipients, (final_message, link) = await asyncio.gather(recipients_task, message_task) logger.info(f"[MailingService][raw] Формирование списка получателей ({len(recipients)})") logger.debug(f"[MailingService][raw] Список получателей: {', '.join(map(str, recipients))}") # Формируем и публикуем индивидуальные уведомления в очередь отправки for chat_id in recipients: notification_payload = { "chat_id": chat_id, "message": final_message, "link": link or None } body = json.dumps(notification_payload).encode("utf-8") msg = Message(body, delivery_mode=DeliveryMode.PERSISTENT) await channel.default_exchange.publish(msg, routing_key=notifications_queue.name) logger.info(f"[MailingService][raw] Поставлено в очередь уведомлений: {len(recipients)} сообщений") except Exception as e: logger.exception(f"[MailingService][raw] Ошибка обработки сообщения: {e}") async def consume_notifications(self): while True: try: logger.info("[MailingService][formated] Подключение к RabbitMQ (уведомления для отправки)...") connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=5) notif_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True) logger.info("[MailingService][formated] Ожидание сообщений из очереди уведомлений...") async with notif_queue.iterator() as queue_iter: async for message in queue_iter: await self.handle_notification_message(message) except Exception as e: logger.error(f"[MailingService][formated] Ошибка подключения или обработки уведомлений: {e}") logger.info("[MailingService][formated] Повторное подключение через 5 секунд...") await asyncio.sleep(5) async def handle_notification_message(self, message: AbstractIncomingMessage): async with message.process(): try: data = json.loads(message.body.decode("utf-8")) chat_id = data.get("chat_id") message_text = data.get("message") link = data.get("link") await self.send_message(chat_id, message_text, link) except Exception as e: logger.error(f"[MailingService][tg message] Ошибка отправки уведомления: {e}") # Можно реализовать message.nack() для повторной попытки async def send_message(self, chat_id, message, link: Optional[str] = None): telegram_id = "unknown" try: await rate_limit_semaphore.acquire() # Получаем telegram_id в отдельном потоке telegram_id = await asyncio.to_thread(get_telegram_id_by_chat_id, self.flask_app, chat_id) # Формируем клавиатуру с кнопкой link_button = create_link_button(link) await asyncio.to_thread( self.bot.send_message, chat_id, message, parse_mode="HTML", reply_markup=link_button ) formatted_message = message.replace("\n", " ").replace("\r", "") logger.info(f"[MailingService][tg message] Отправлено уведомление {telegram_id} ({chat_id}): {formatted_message}") except apihelper.ApiTelegramException as e: # Попытка получить error_code и описание из исключения error_code = None description = None try: if hasattr(e, "result") and e.result: error_code = e.result.get("error_code") description = e.result.get("description") except Exception: pass if error_code == 429: logger.warning(f"[MailingService][tg message] Rate limit для {telegram_id} ({chat_id}), ждем и повторяем...") await asyncio.sleep(1) await self.send_message(chat_id, message, link) elif error_code in (400, 403) and description and ( "user is deactivated" in description.lower() or "bot was blocked by the user" in description.lower() or "user not found" in description.lower() ): logger.warning(f"[MailingService][tg message] Ошибка {error_code}: {description}. Пользователь {telegram_id} ({chat_id}) заблокировал бота или не существует. Отправка пропущена.") # Не повторяем отправку else: logger.error(f"[MailingService][tg message] Ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") except Exception as e: logger.error(f"[MailingService][tg message] Неожиданная ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") finally: rate_limit_semaphore.release()