171 lines
9.1 KiB
Python
171 lines
9.1 KiB
Python
import asyncio
|
||
import json
|
||
import logging
|
||
from typing import Optional
|
||
|
||
from aio_pika import connect_robust, Message, DeliveryMode
|
||
from aio_pika.abc import AbstractIncomingMessage
|
||
from telebot import apihelper
|
||
|
||
from app.bot.services.mailing_service.composer import compose_telegram_message
|
||
from app.bot.services.mailing_service.db_utils import get_recipients_by_region
|
||
from app.bot.services.mailing_service.parser import parse_region_id
|
||
from app.bot.services.mailing_service.utils.link_button import create_link_button
|
||
from app.bot.services.mailing_service.utils.tg_get_user import get_telegram_id_by_chat_id
|
||
from config import RABBITMQ_URL_FULL, RABBITMQ_QUEUE, RABBITMQ_NOTIFICATIONS_QUEUE, MAILING_RATE_LIMIT
|
||
|
||
logger = logging.getLogger("TeleBot")
|
||
rate_limit_semaphore = asyncio.Semaphore(MAILING_RATE_LIMIT)
|
||
|
||
class AsyncMailingService:
|
||
def __init__(self, flask_app, bot):
|
||
self.flask_app = flask_app
|
||
self.bot = bot
|
||
self.loop = asyncio.get_event_loop()
|
||
|
||
async def start(self):
|
||
await asyncio.gather(
|
||
self.consume_raw_messages(),
|
||
self.consume_notifications()
|
||
)
|
||
|
||
async def consume_raw_messages(self):
|
||
while True:
|
||
try:
|
||
logger.info("[MailingService][raw] Подключение к RabbitMQ (сырые сообщения)...")
|
||
connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop)
|
||
async with connection:
|
||
channel = await connection.channel()
|
||
await channel.set_qos(prefetch_count=10)
|
||
raw_queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True)
|
||
notifications_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True)
|
||
|
||
logger.info("[MailingService][raw] Ожидание сообщений из очереди...")
|
||
|
||
async with raw_queue.iterator() as queue_iter:
|
||
async for message in queue_iter:
|
||
await self.handle_raw_message(message, channel, notifications_queue)
|
||
except Exception as e:
|
||
logger.error(f"[MailingService][raw] Ошибка подключения или обработки: {e}")
|
||
logger.info("[MailingService][raw] Повторное подключение через 5 секунд...")
|
||
await asyncio.sleep(5)
|
||
|
||
async def handle_raw_message(self, message: AbstractIncomingMessage, channel, notifications_queue):
|
||
async with message.process():
|
||
with self.flask_app.app_context():
|
||
try:
|
||
data = json.loads(message.body.decode("utf-8"))
|
||
logger.info(f"[MailingService][raw] Получены данные: {json.dumps(data, ensure_ascii=False)}")
|
||
|
||
region_id = int(parse_region_id(data.get("hostgroups")))
|
||
severity = data.get("severity", "")
|
||
|
||
# Получатели и сообщение параллельно
|
||
recipients_task = asyncio.to_thread(get_recipients_by_region, self.flask_app, region_id, severity)
|
||
message_task = asyncio.to_thread(compose_telegram_message, data)
|
||
|
||
recipients, (final_message, link) = await asyncio.gather(recipients_task, message_task)
|
||
logger.info(f"[MailingService][raw] Формирование списка получателей ({len(recipients)})")
|
||
logger.debug(f"[MailingService][raw] Список получателей: {', '.join(map(str, recipients))}")
|
||
|
||
# Формируем и публикуем индивидуальные уведомления в очередь отправки
|
||
for chat_id in recipients:
|
||
notification_payload = {
|
||
"chat_id": chat_id,
|
||
"message": final_message,
|
||
"link": link or None
|
||
}
|
||
body = json.dumps(notification_payload).encode("utf-8")
|
||
msg = Message(body, delivery_mode=DeliveryMode.PERSISTENT)
|
||
await channel.default_exchange.publish(msg, routing_key=notifications_queue.name)
|
||
|
||
logger.info(f"[MailingService][raw] Поставлено в очередь уведомлений: {len(recipients)} сообщений")
|
||
|
||
except Exception as e:
|
||
logger.exception(f"[MailingService][raw] Ошибка обработки сообщения: {e}")
|
||
|
||
async def consume_notifications(self):
|
||
while True:
|
||
try:
|
||
logger.info("[MailingService][formated] Подключение к RabbitMQ (уведомления для отправки)...")
|
||
connection = await connect_robust(RABBITMQ_URL_FULL, loop=self.loop)
|
||
async with connection:
|
||
channel = await connection.channel()
|
||
await channel.set_qos(prefetch_count=5)
|
||
notif_queue = await channel.declare_queue(RABBITMQ_NOTIFICATIONS_QUEUE, durable=True)
|
||
|
||
logger.info("[MailingService][formated] Ожидание сообщений из очереди уведомлений...")
|
||
|
||
async with notif_queue.iterator() as queue_iter:
|
||
async for message in queue_iter:
|
||
await self.handle_notification_message(message)
|
||
except Exception as e:
|
||
logger.error(f"[MailingService][formated] Ошибка подключения или обработки уведомлений: {e}")
|
||
logger.info("[MailingService][formated] Повторное подключение через 5 секунд...")
|
||
await asyncio.sleep(5)
|
||
|
||
async def handle_notification_message(self, message: AbstractIncomingMessage):
|
||
async with message.process():
|
||
try:
|
||
data = json.loads(message.body.decode("utf-8"))
|
||
chat_id = data.get("chat_id")
|
||
message_text = data.get("message")
|
||
link = data.get("link")
|
||
|
||
await self.send_message(chat_id, message_text, link)
|
||
except Exception as e:
|
||
logger.error(f"[MailingService][tg message] Ошибка отправки уведомления: {e}")
|
||
# Можно реализовать message.nack() для повторной попытки
|
||
|
||
async def send_message(self, chat_id, message, link: Optional[str] = None):
|
||
telegram_id = "unknown"
|
||
try:
|
||
await rate_limit_semaphore.acquire()
|
||
|
||
# Получаем telegram_id в отдельном потоке
|
||
telegram_id = await asyncio.to_thread(get_telegram_id_by_chat_id, self.flask_app, chat_id)
|
||
|
||
# Формируем клавиатуру с кнопкой
|
||
link_button = create_link_button(link)
|
||
|
||
await asyncio.to_thread(
|
||
self.bot.send_message,
|
||
chat_id,
|
||
message,
|
||
parse_mode="HTML",
|
||
reply_markup=link_button
|
||
)
|
||
|
||
formatted_message = message.replace("\n", " ").replace("\r", "")
|
||
logger.info(f"[MailingService][tg message] Отправлено уведомление {telegram_id} ({chat_id}): {formatted_message}")
|
||
|
||
except apihelper.ApiTelegramException as e:
|
||
# Попытка получить error_code и описание из исключения
|
||
error_code = None
|
||
description = None
|
||
try:
|
||
if hasattr(e, "result") and e.result:
|
||
error_code = e.result.get("error_code")
|
||
description = e.result.get("description")
|
||
except Exception:
|
||
pass
|
||
|
||
if error_code == 429:
|
||
logger.warning(f"[MailingService][tg message] Rate limit для {telegram_id} ({chat_id}), ждем и повторяем...")
|
||
await asyncio.sleep(1)
|
||
await self.send_message(chat_id, message, link)
|
||
elif error_code in (400, 403) and description and (
|
||
"user is deactivated" in description.lower() or
|
||
"bot was blocked by the user" in description.lower() or
|
||
"user not found" in description.lower()
|
||
):
|
||
logger.warning(f"[MailingService][tg message] Ошибка {error_code}: {description}. Пользователь {telegram_id} ({chat_id}) заблокировал бота или не существует. Отправка пропущена.")
|
||
# Не повторяем отправку
|
||
else:
|
||
logger.error(f"[MailingService][tg message] Ошибка отправки сообщения {telegram_id} ({chat_id}): {e}")
|
||
except Exception as e:
|
||
logger.error(f"[MailingService][tg message] Неожиданная ошибка отправки сообщения {telegram_id} ({chat_id}): {e}")
|
||
finally:
|
||
rate_limit_semaphore.release()
|
||
|