diff --git a/app/bot/services/mailing_service/mailing_consumer.py b/app/bot/services/mailing_service/mailing_consumer.py index 60b5d8b..d304cdd 100644 --- a/app/bot/services/mailing_service/mailing_consumer.py +++ b/app/bot/services/mailing_service/mailing_consumer.py @@ -1,17 +1,21 @@ import asyncio import json import logging +from typing import Optional -from telebot import apihelper from aio_pika import connect_robust, Message, DeliveryMode from aio_pika.abc import AbstractIncomingMessage -from app.bot.services.mailing_service.parser import parse_region_id +from telebot import apihelper + from app.bot.services.mailing_service.composer import compose_telegram_message from app.bot.services.mailing_service.db_utils import get_recipients_by_region -from config import RABBITMQ_URL_FULL, RABBITMQ_QUEUE, RABBITMQ_NOTIFICATIONS_QUEUE +from app.bot.services.mailing_service.parser import parse_region_id +from app.bot.services.mailing_service.utils.link_button import create_link_button +from app.bot.services.mailing_service.utils.tg_get_user import get_telegram_id_by_chat_id +from config import RABBITMQ_URL_FULL, RABBITMQ_QUEUE, RABBITMQ_NOTIFICATIONS_QUEUE, MAILING_RATE_LIMIT logger = logging.getLogger("TeleBot") -rate_limit_semaphore = asyncio.Semaphore(25) +rate_limit_semaphore = asyncio.Semaphore(MAILING_RATE_LIMIT) class AsyncMailingService: def __init__(self, flask_app, bot): @@ -28,7 +32,7 @@ class AsyncMailingService: async def consume_raw_messages(self): while True: try: - logger.info("[MailingService] Подключение к RabbitMQ (сырые сообщения)...") + logger.info("[MailingService][raw] Подключение к RabbitMQ (сырые сообщения)...") connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop) async with connection: channel = await connection.channel() @@ -36,14 +40,14 @@ class AsyncMailingService: raw_queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True) notifications_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True) - logger.info("[MailingService] Ожидание сообщений из очереди...") + logger.info("[MailingService][raw] Ожидание сообщений из очереди...") async with raw_queue.iterator() as queue_iter: async for message in queue_iter: await self.handle_raw_message(message, channel, notifications_queue) except Exception as e: - logger.error(f"[MailingService] Ошибка подключения или обработки: {e}") - logger.info("[MailingService] Повторное подключение через 5 секунд...") + logger.error(f"[MailingService][raw] Ошибка подключения или обработки: {e}") + logger.info("[MailingService][raw] Повторное подключение через 5 секунд...") await asyncio.sleep(5) async def handle_raw_message(self, message: AbstractIncomingMessage, channel, notifications_queue): @@ -51,9 +55,9 @@ class AsyncMailingService: with self.flask_app.app_context(): try: data = json.loads(message.body.decode("utf-8")) - logger.info(f"[MailingService] Получено сообщение: {json.dumps(data, ensure_ascii=False)}") + logger.info(f"[MailingService][raw] Получены данные: {json.dumps(data, ensure_ascii=False)}") - region_id = parse_region_id(data.get("host")) + region_id = int(parse_region_id(data.get("hostgroups"))) severity = data.get("severity", "") # Получатели и сообщение параллельно @@ -61,12 +65,8 @@ class AsyncMailingService: message_task = asyncio.to_thread(compose_telegram_message, data) recipients, (final_message, link) = await asyncio.gather(recipients_task, message_task) - - logger.info(f"[MailingService] Получатели: {recipients}") - if link: - logger.info(f"[MailingService] Сообщение для Telegram: {final_message} {link}") - else: - logger.info(f"[MailingService] Сообщение для Telegram: {final_message}") + logger.info(f"[MailingService][raw] Формирование списка получателей ({len(recipients)})") + logger.debug(f"[MailingService][raw] Список получателей: {', '.join(map(str, recipients))}") # Формируем и публикуем индивидуальные уведомления в очередь отправки for chat_id in recipients: @@ -79,29 +79,29 @@ class AsyncMailingService: msg = Message(body, delivery_mode=DeliveryMode.PERSISTENT) await channel.default_exchange.publish(msg, routing_key=notifications_queue.name) - logger.info(f"[MailingService] Поставлено в очередь уведомлений: {len(recipients)} сообщений") + logger.info(f"[MailingService][raw] Поставлено в очередь уведомлений: {len(recipients)} сообщений") except Exception as e: - logger.exception(f"[MailingService] Ошибка обработки сообщения: {e}") + logger.exception(f"[MailingService][raw] Ошибка обработки сообщения: {e}") async def consume_notifications(self): while True: try: - logger.info("[MailingService] Подключение к RabbitMQ (уведомления для отправки)...") + logger.info("[MailingService][formated] Подключение к RabbitMQ (уведомления для отправки)...") connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=5) notif_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True) - logger.info("[MailingService] Ожидание сообщений из очереди уведомлений...") + logger.info("[MailingService][formated] Ожидание сообщений из очереди уведомлений...") async with notif_queue.iterator() as queue_iter: async for message in queue_iter: await self.handle_notification_message(message) except Exception as e: - logger.error(f"[MailingService] Ошибка подключения или обработки уведомлений: {e}") - logger.info("[MailingService] Повторное подключение через 5 секунд...") + logger.error(f"[MailingService][formated] Ошибка подключения или обработки уведомлений: {e}") + logger.info("[MailingService][formated] Повторное подключение через 5 секунд...") await asyncio.sleep(5) async def handle_notification_message(self, message: AbstractIncomingMessage): @@ -111,41 +111,60 @@ class AsyncMailingService: chat_id = data.get("chat_id") message_text = data.get("message") link = data.get("link") - if link: - message_text = f"{message_text} {link}" - # TODO: расширить логику отправки с кнопкой, если нужно - await self.send_message(chat_id, message_text) + await self.send_message(chat_id, message_text, link) except Exception as e: - logger.error(f"[MailingService] Ошибка отправки уведомления: {e}") + logger.error(f"[MailingService][tg message] Ошибка отправки уведомления: {e}") # Можно реализовать message.nack() для повторной попытки - async def send_message(self, chat_id, message): + async def send_message(self, chat_id, message, link: Optional[str] = None): telegram_id = "unknown" try: await rate_limit_semaphore.acquire() - def get_telegram_id(): - with self.flask_app.app_context(): - from app.models.users import Users - user = Users.query.filter_by(chat_id=chat_id).first() - return user.telegram_id if user else "unknown" + # Получаем telegram_id в отдельном потоке + telegram_id = await asyncio.to_thread(get_telegram_id_by_chat_id, self.flask_app, chat_id) - telegram_id = await asyncio.to_thread(get_telegram_id) + # Формируем клавиатуру с кнопкой + link_button = create_link_button(link) - await asyncio.to_thread(self.bot.send_message, chat_id, message, parse_mode="HTML") + await asyncio.to_thread( + self.bot.send_message, + chat_id, + message, + parse_mode="HTML", + reply_markup=link_button + ) formatted_message = message.replace("\n", " ").replace("\r", "") - logger.info(f"[MailingService] Отправлено уведомление {telegram_id} ({chat_id}): {formatted_message}") + logger.info(f"[MailingService][tg message] Отправлено уведомление {telegram_id} ({chat_id}): {formatted_message}") except apihelper.ApiTelegramException as e: - if "429" in str(e): - logger.warning(f"[MailingService] Rate limit для {telegram_id} ({chat_id}), ждем и повторяем...") + # Попытка получить error_code и описание из исключения + error_code = None + description = None + try: + if hasattr(e, "result") and e.result: + error_code = e.result.get("error_code") + description = e.result.get("description") + except Exception: + pass + + if error_code == 429: + logger.warning(f"[MailingService][tg message] Rate limit для {telegram_id} ({chat_id}), ждем и повторяем...") await asyncio.sleep(1) - await self.send_message(chat_id, message) + await self.send_message(chat_id, message, link) + elif error_code in (400, 403) and description and ( + "user is deactivated" in description.lower() or + "bot was blocked by the user" in description.lower() or + "user not found" in description.lower() + ): + logger.warning(f"[MailingService][tg message] Ошибка {error_code}: {description}. Пользователь {telegram_id} ({chat_id}) заблокировал бота или не существует. Отправка пропущена.") + # Не повторяем отправку else: - logger.error(f"[MailingService] Ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") + logger.error(f"[MailingService][tg message] Ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") except Exception as e: - logger.error(f"[MailingService] Неожиданная ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") + logger.error(f"[MailingService][tg message] Неожиданная ошибка отправки сообщения {telegram_id} ({chat_id}): {e}") finally: rate_limit_semaphore.release() + diff --git a/app/bot/services/mailing_service/parser.py b/app/bot/services/mailing_service/parser.py index 3be2cf6..6fce2eb 100644 --- a/app/bot/services/mailing_service/parser.py +++ b/app/bot/services/mailing_service/parser.py @@ -1,19 +1,31 @@ # parser.py -import re -def parse_region_id(host: str) -> int | None: - """ - Извлекает region_id из строки host. - Формат: p<...>, например p18ecpapp01 → region_id = 18 +# def parse_region_id(host: str) -> int | None: +# """ +# Извлекает region_id из строки host. +# Формат: p<...>, например p18ecpapp01 → region_id = 18 +# +# Returns: +# int | None: номер региона или None +# """ +# if not host or not host.startswith("p"): +# return None +# +# match = re.match(r"^p(\d+)", host) +# if match: +# return int(match.group(1)) +# return None - Returns: - int | None: номер региона или None - """ - if not host or not host.startswith("p"): - return None +def parse_region_id(hostgroups: str | None) -> str: + if not hostgroups: + return "0" - match = re.match(r"^p(\d+)", host) - if match: - return int(match.group(1)) - return None + if hostgroups == "p00rtmis": + return "0" + + parts = hostgroups.split("_") + if len(parts) >= 2: + return parts[1] + + return "0" diff --git a/app/bot/services/mailing_service/utils/link_button.py b/app/bot/services/mailing_service/utils/link_button.py new file mode 100644 index 0000000..0d81a97 --- /dev/null +++ b/app/bot/services/mailing_service/utils/link_button.py @@ -0,0 +1,17 @@ +from typing import Optional + +from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton + + +def create_link_button(link: Optional[str]) -> Optional[InlineKeyboardMarkup]: + """ + Создаёт InlineKeyboardMarkup с кнопкой-ссылкой. + Если ссылка не передана, возвращает None. + """ + if not link: + return None + + markup = InlineKeyboardMarkup() + button = InlineKeyboardButton(text="График", url=link) + markup.add(button) + return markup diff --git a/app/bot/services/mailing_service/utils/tg_get_user.py b/app/bot/services/mailing_service/utils/tg_get_user.py new file mode 100644 index 0000000..78096a3 --- /dev/null +++ b/app/bot/services/mailing_service/utils/tg_get_user.py @@ -0,0 +1,6 @@ +def get_telegram_id_by_chat_id(flask_app, chat_id: int) -> str: + from app.models.users import Users # импорт здесь, чтобы избежать циклических зависимостей + + with flask_app.app_context(): + user = Users.query.filter_by(chat_id=chat_id).first() + return user.telegram_id if user else "unknown"