import asyncio import json import logging from telebot import apihelper from aio_pika import connect_robust, Message, DeliveryMode from aio_pika.abc import AbstractIncomingMessage from app.bot.services.mailing_service.parser import parse_region_id from app.bot.services.mailing_service.composer import compose_telegram_message from app.bot.services.mailing_service.db_utils import get_recipients_by_region from config import RABBITMQ_URL_FULL, RABBITMQ_QUEUE, RABBITMQ_NOTIFICATIONS_QUEUE logger = logging.getLogger("TeleBot") rate_limit_semaphore = asyncio.Semaphore(25) class AsyncMailingService: def __init__(self, flask_app, bot): self.flask_app = flask_app self.bot = bot self.loop = asyncio.get_event_loop() async def start(self): await asyncio.gather( self.consume_raw_messages(), self.consume_notifications() ) async def consume_raw_messages(self): while True: try: logger.info("[MailingService] Подключение к RabbitMQ (сырые сообщения)...") connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=10) raw_queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True) notifications_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True) logger.info("[MailingService] Ожидание сообщений из очереди...") async with raw_queue.iterator() as queue_iter: async for message in queue_iter: await self.handle_raw_message(message, channel, notifications_queue) except Exception as e: logger.error(f"[MailingService] Ошибка подключения или обработки: {e}") logger.info("[MailingService] Повторное подключение через 5 секунд...") await asyncio.sleep(5) async def handle_raw_message(self, message: AbstractIncomingMessage, channel, notifications_queue): async with message.process(): with self.flask_app.app_context(): try: data = json.loads(message.body.decode("utf-8")) logger.info(f"[MailingService] Получено сообщение: {json.dumps(data, ensure_ascii=False)}") region_id = parse_region_id(data.get("host")) severity = data.get("severity", "") # Получатели и сообщение параллельно recipients_task = asyncio.to_thread(get_recipients_by_region, self.flask_app, region_id, severity) message_task = asyncio.to_thread(compose_telegram_message, data) recipients, (final_message, link) = await asyncio.gather(recipients_task, message_task) logger.info(f"[MailingService] Получатели: {recipients}") if link: logger.info(f"[MailingService] Сообщение для Telegram: {final_message} {link}") else: logger.info(f"[MailingService] Сообщение для Telegram: {final_message}") # Формируем и публикуем индивидуальные уведомления в очередь отправки for chat_id in recipients: notification_payload = { "chat_id": chat_id, "message": final_message, "link": link or None } body = json.dumps(notification_payload).encode("utf-8") msg = Message(body, delivery_mode=DeliveryMode.PERSISTENT) await channel.default_exchange.publish(msg, routing_key=notifications_queue.name) logger.info(f"[MailingService] Поставлено в очередь уведомлений: {len(recipients)} сообщений") except Exception as e: logger.exception(f"[MailingService] Ошибка обработки сообщения: {e}") async def consume_notifications(self): while True: try: logger.info("[MailingService] Подключение к RabbitMQ (уведомления для отправки)...") connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=5) notif_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True) logger.info("[MailingService] Ожидание сообщений из очереди уведомлений...") async with notif_queue.iterator() as queue_iter: async for message in queue_iter: await self.handle_notification_message(message) except Exception as e: logger.error(f"[MailingService] Ошибка подключения или обработки уведомлений: {e}") logger.info("[MailingService] Повторное подключение через 5 секунд...") await asyncio.sleep(5) async def handle_notification_message(self, message: AbstractIncomingMessage): async with message.process(): try: data = json.loads(message.body.decode("utf-8")) chat_id = data.get("chat_id") message_text = data.get("message") link = data.get("link") if link: message_text = f"{message_text} {link}" # TODO: расширить логику отправки с кнопкой, если нужно await self.send_message(chat_id, message_text) except Exception as e: logger.error(f"[MailingService] Ошибка отправки уведомления: {e}") # Можно реализовать message.nack() для повторной попытки async def send_message(self, chat_id, message): telegram_id = "unknown" try: await rate_limit_semaphore.acquire() def get_telegram_id(): with self.flask_app.app_context(): from app.models.users import Users user = Users.query.filter_by(chat_id=chat_id).first() return user.telegram_id if user else "unknown" telegram_id = await asyncio.to_thread(get_telegram_id) await asyncio.to_thread(self.bot.send_message, chat_id, message, parse_mode="HTML") formatted_message = message.replace("\n", " ").replace("\r", "") logger.info(f"[MailingService] Отправлено уведомление {telegram_id} ({chat_id}): {formatted_message}") except apihelper.ApiTelegramException as e: if "429" in str(e): logger.warning(f"[MailingService] Rate limit для {telegram_id} ({chat_id}), ждем и повторяем...") await asyncio.sleep(1) await self.send_message(chat_id, message) else: logger.error(f"[MailingService] Ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") except Exception as e: logger.error(f"[MailingService] Неожиданная ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") finally: rate_limit_semaphore.release()