import os import asyncio import logging import json import time import pika from concurrent.futures import ThreadPoolExecutor from functools import partial # RabbitMQ configuration RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost') RABBITMQ_QUEUE = 'telegram_notifications' RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN') RABBITMQ_PASS = os.getenv('RABBITMQ_PASS') # Импорт функций для отправки сообщений from telezab import send_notification_message # Замените на актуальный путь к send_notification_message class RabbitMQWorker: def __init__(self): self.connection = None self.channel = None def rabbitmq_connection(self): """Устанавливает подключение к RabbitMQ.""" try: credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS) parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, credentials=credentials, heartbeat=600, blocked_connection_timeout=300 ) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) logging.info("RabbitMQ connection established.") except Exception as e: logging.error(f"Error establishing RabbitMQ connection: {e}") self.connection = None self.channel = None async def consume_from_queue(self): """Основной цикл обработки сообщений из RabbitMQ.""" while True: try: if not self.connection or self.connection.is_closed: self.rabbitmq_connection() for method_frame, properties, body in self.channel.consume(RABBITMQ_QUEUE, inactivity_timeout=5): if not method_frame: continue # Нет новых сообщений, продолжаем ждать # Декодируем сообщение из очереди message = json.loads(body) chat_id = message['chat_id'] username = message['username'] message_text = message['message'] try: # Отправляем сообщение await send_notification_message(chat_id, message_text, username) self.channel.basic_ack(method_frame.delivery_tag) # Подтверждаем получение except Exception as e: logging.error(f"Error processing message: {e}") self.channel.basic_nack(method_frame.delivery_tag) # Возвращаем сообщение в очередь except pika.exceptions.AMQPConnectionError as e: logging.error(f"RabbitMQ connection error: {e}. Reconnecting in 5 seconds...") self.close_connection() await asyncio.sleep(5) except Exception as e: logging.error(f"Unexpected error in RabbitMQ consumer: {e}") await asyncio.sleep(5) def close_connection(self): """Закрывает соединение с RabbitMQ.""" if self.connection and not self.connection.is_closed: self.connection.close() logging.info("RabbitMQ connection closed.") async def run(self): """Запускает основной цикл обработки очереди.""" try: await self.consume_from_queue() except asyncio.CancelledError: logging.info("RabbitMQ consumer stopped.") finally: self.close_connection()