import time import pika from pika.exceptions import ChannelClosedByBroker, ConnectionClosed, AMQPConnectionError from config import RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_LOGIN, RABBITMQ_PASS, RABBITMQ_QUEUE class RabbitMQClient: def __init__(self): self._connect() def _connect(self): credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS) parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials, heartbeat=600, blocked_connection_timeout=300 ) self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel() self.channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) def publish_message(self, message: str, retry=1): try: if not self.connection or self.connection.is_closed or not self.channel or self.channel.is_closed: self._connect() self.channel.basic_publish( exchange='', routing_key=RABBITMQ_QUEUE, body=message, properties=pika.BasicProperties( delivery_mode=2, ) ) except (ChannelClosedByBroker, ConnectionClosed, AMQPConnectionError) as e: if retry > 0: # Короткая пауза перед переподключением time.sleep(1) self._connect() self.publish_message(message, retry=retry-1) else: raise e def close(self): if self.connection and self.connection.is_open: self.connection.close() def close(self): if self.connection and self.connection.is_open: self.connection.close() # Глобальная переменная rabbitmq_client = None def get_rabbitmq_client(): global rabbitmq_client if rabbitmq_client is None: rabbitmq_client = RabbitMQClient() return rabbitmq_client