import asyncio import json import logging from app import app, Users import aio_pika import pika from config import RABBITMQ_LOGIN, RABBITMQ_PASS, RABBITMQ_HOST, RABBITMQ_QUEUE, RABBITMQ_URL_FULL logger = logging.getLogger(__name__) rate_limit_semaphore = asyncio.Semaphore(25) def rabbitmq_connection(): credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS) parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, credentials=credentials, heartbeat=600, blocked_connection_timeout=300 ) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) return connection, channel def send_to_queue(message): connection, channel = rabbitmq_connection() channel.basic_publish( exchange='', routing_key=RABBITMQ_QUEUE, body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2, )) connection.close() async def send_message(chat_id, message, backend_bot, is_notification=False): telegram_id = "unknown" try: if is_notification: await rate_limit_semaphore.acquire() def get_user(): with app.app_context(): user = Users.query.get(chat_id) return user.telegram_id if user else "unknown" telegram_id = await asyncio.to_thread(get_user) await asyncio.to_thread(backend_bot.bot.send_message, chat_id, message, parse_mode='HTML') formatted_message = message.replace('\n', ' ').replace('\r', '') logger.info(f'Send notification to {telegram_id} ({chat_id}) from RabbitMQ [{formatted_message}]') except Exception as e: logger.error(f"Error sending message to {telegram_id} ({chat_id}): {e}") finally: if is_notification: rate_limit_semaphore.release() async def consume_from_queue(backend_bot): while True: try: connection = await aio_pika.connect_robust(RABBITMQ_URL_FULL) async with connection: channel = await connection.channel() queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True) async for message in queue: async with message.process(): try: data = json.loads(message.body.decode('utf-8')) chat_id = data["chat_id"] message_text = data["message"] await send_message(chat_id, message_text, backend_bot, is_notification=True) except (json.JSONDecodeError, KeyError) as e: logger.error(f"Error processing message: {e}") except Exception as e: logger.error(f"Error sending message: {e}") except aio_pika.exceptions.AMQPError as e: logger.error(f"RabbitMQ error: {e}") except Exception as e: logger.error(f"Critical error: {e}") finally: await asyncio.sleep(5)