Telezab/rabbitmq.py

156 lines
7.0 KiB
Python

import asyncio
import json
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import aio_pika
import aiohttp
import pika
import telebot
import backend_bot
from config import RABBITMQ_LOGIN, RABBITMQ_PASS, RABBITMQ_HOST, RABBITMQ_QUEUE, RABBITMQ_URL_FULL
# Semaphore for rate limiting
rate_limit_semaphore = asyncio.Semaphore(25)
def rabbitmq_connection():
# Создаем объект учетных данных
credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS)
# Указываем параметры подключения, включая учетные данные
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
credentials=credentials, # Передаем учетные данные
heartbeat=600, # Интервал heartbeat для поддержания соединения
blocked_connection_timeout=300 # Таймаут блокировки соединения
)
# Создаем подключение и канал
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
return connection, channel
def send_to_queue(message):
connection, channel = rabbitmq_connection()
channel.basic_publish(
exchange='',
routing_key=RABBITMQ_QUEUE,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
connection.close()
async def consume_from_queue():
while True: # Бесконечный цикл для переподключения
try:
# Подключение к RabbitMQ
connection = await aio_pika.connect_robust(RABBITMQ_URL_FULL)
async with connection:
# Открываем канал
channel = await connection.channel()
# Объявляем очередь (если нужно)
queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True)
# Потребляем сообщения
async for message in queue:
async with message.process(): # Авто подтверждение сообщения
try:
# Парсинг сообщения
data = json.loads(message.body)
# Проверка структуры сообщения
if not isinstance(data, dict):
raise ValueError("Invalid message format: Expected a dictionary")
# Извлечение необходимых данных
chat_id = data.get("chat_id")
username = data.get("username")
message_text = data.get("message")
# Проверка обязательных полей
if not all([chat_id, username, message_text]):
raise ValueError(f"Missing required fields in message: {data}")
# Отправляем сообщение
await send_notification_message(chat_id, message_text, username)
except json.JSONDecodeError:
# Логируем некорректный JSON
telebot.logger.error(f"Failed to decode message: {message.body}")
except ValueError as ve:
# Логируем ошибку формата сообщения
telebot.logger.error(f"Invalid message: {ve}")
except Exception as e:
# Логируем общую ошибку при обработке
telebot.logger.error(f"Error sending message from queue: {e}")
except aio_pika.exceptions.AMQPError as e:
# Логируем ошибку RabbitMQ и переподключаемся
telebot.logger.error(f"RabbitMQ error: {e}")
except Exception as e:
# Логируем общую ошибку и ждем перед переподключением
telebot.logger.error(f"Critical error in consume_from_queue: {e}")
finally:
# Задержка перед переподключением
await asyncio.sleep(5)
async def send_message(chat_id, message, is_notification=False):
try:
if is_notification:
await rate_limit_semaphore.acquire()
parse_mode = 'HTML'
# Используем partial для передачи именованных аргументов в bot.send_message
func_with_args = partial(backend_bot.bot.send_message, chat_id=chat_id, text=message, parse_mode=parse_mode)
# Передаем подготовленную функцию в run_in_executor
await run_in_executor(func_with_args)
except telebot.apihelper.ApiTelegramException as e:
if "429" in str(e):
telebot.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...")
await asyncio.sleep(1)
await send_message(chat_id, message, is_notification)
elif "403" in str(e):
telebot.logger.warning(f"Can't send message to user because bot blocked by user with chat id: {chat_id}")
pass
else:
telebot.logger.error(f"Failed to send message to {chat_id}: {e}")
telebot.logger.error(f"Detailed Error: {e}", exc_info=True) # Добавлено логирование исключения
except Exception as e:
username = f"@{message.from_user.username}" if message.from_user.username else "N/A"
telebot.logger.error(f"Unexpected error while sending message to {username} {chat_id}: {e}", exc_info=True)
await check_telegram_api()
finally:
if is_notification:
rate_limit_semaphore.release()
formatted_message = message.replace('\n', ' ').replace('\r', '')
telebot.logger.info(f'Send notification to {chat_id} from RabbitMQ [{formatted_message}]')
async def send_notification_message(chat_id, message, username):
await send_message(chat_id, message, is_notification=True)
async def run_in_executor(func, *args):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
return await loop.run_in_executor(pool, func, *args)
async def check_telegram_api():
try:
async with aiohttp.ClientSession() as session:
async with session.get('https://api.telegram.org') as response:
if response.status == 200:
telebot.logger.info("Telegram API is reachable.")
else:
telebot.logger.error("Telegram API is not reachable.")
except Exception as e:
telebot.logger.error(f"Error checking Telegram API: {e}")