Telezab/rabbitmq_worker.py
2025-02-09 09:58:02 +05:00

91 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
import logging
import json
import time
import pika
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# RabbitMQ configuration
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
RABBITMQ_QUEUE = 'telegram_notifications'
RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN')
RABBITMQ_PASS = os.getenv('RABBITMQ_PASS')
# Импорт функций для отправки сообщений
from telezab import send_notification_message # Замените на актуальный путь к send_notification_message
class RabbitMQWorker:
def __init__(self):
self.connection = None
self.channel = None
def rabbitmq_connection(self):
"""Устанавливает подключение к RabbitMQ."""
try:
credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
logging.info("RabbitMQ connection established.")
except Exception as e:
logging.error(f"Error establishing RabbitMQ connection: {e}")
self.connection = None
self.channel = None
async def consume_from_queue(self):
"""Основной цикл обработки сообщений из RabbitMQ."""
while True:
try:
if not self.connection or self.connection.is_closed:
self.rabbitmq_connection()
for method_frame, properties, body in self.channel.consume(RABBITMQ_QUEUE, inactivity_timeout=5):
if not method_frame:
continue # Нет новых сообщений, продолжаем ждать
# Декодируем сообщение из очереди
message = json.loads(body)
chat_id = message['chat_id']
username = message['username']
message_text = message['message']
try:
# Отправляем сообщение
await send_notification_message(chat_id, message_text, username)
self.channel.basic_ack(method_frame.delivery_tag) # Подтверждаем получение
except Exception as e:
logging.error(f"Error processing message: {e}")
self.channel.basic_nack(method_frame.delivery_tag) # Возвращаем сообщение в очередь
except pika.exceptions.AMQPConnectionError as e:
logging.error(f"RabbitMQ connection error: {e}. Reconnecting in 5 seconds...")
self.close_connection()
await asyncio.sleep(5)
except Exception as e:
logging.error(f"Unexpected error in RabbitMQ consumer: {e}")
await asyncio.sleep(5)
def close_connection(self):
"""Закрывает соединение с RabbitMQ."""
if self.connection and not self.connection.is_closed:
self.connection.close()
logging.info("RabbitMQ connection closed.")
async def run(self):
"""Запускает основной цикл обработки очереди."""
try:
await self.consume_from_queue()
except asyncio.CancelledError:
logging.info("RabbitMQ consumer stopped.")
finally:
self.close_connection()