91 lines
3.8 KiB
Python
91 lines
3.8 KiB
Python
import os
|
||
import asyncio
|
||
import logging
|
||
import json
|
||
import time
|
||
import pika
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from functools import partial
|
||
|
||
# RabbitMQ configuration
|
||
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
|
||
RABBITMQ_QUEUE = 'telegram_notifications'
|
||
RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN')
|
||
RABBITMQ_PASS = os.getenv('RABBITMQ_PASS')
|
||
|
||
# Импорт функций для отправки сообщений
|
||
from telezab import send_notification_message # Замените на актуальный путь к send_notification_message
|
||
|
||
|
||
class RabbitMQWorker:
|
||
def __init__(self):
|
||
self.connection = None
|
||
self.channel = None
|
||
|
||
def rabbitmq_connection(self):
|
||
"""Устанавливает подключение к RabbitMQ."""
|
||
try:
|
||
credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS)
|
||
parameters = pika.ConnectionParameters(
|
||
host=RABBITMQ_HOST,
|
||
credentials=credentials,
|
||
heartbeat=600,
|
||
blocked_connection_timeout=300
|
||
)
|
||
self.connection = pika.BlockingConnection(parameters)
|
||
self.channel = self.connection.channel()
|
||
self.channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
|
||
logging.info("RabbitMQ connection established.")
|
||
except Exception as e:
|
||
logging.error(f"Error establishing RabbitMQ connection: {e}")
|
||
self.connection = None
|
||
self.channel = None
|
||
|
||
async def consume_from_queue(self):
|
||
"""Основной цикл обработки сообщений из RabbitMQ."""
|
||
while True:
|
||
try:
|
||
if not self.connection or self.connection.is_closed:
|
||
self.rabbitmq_connection()
|
||
|
||
for method_frame, properties, body in self.channel.consume(RABBITMQ_QUEUE, inactivity_timeout=5):
|
||
if not method_frame:
|
||
continue # Нет новых сообщений, продолжаем ждать
|
||
|
||
# Декодируем сообщение из очереди
|
||
message = json.loads(body)
|
||
chat_id = message['chat_id']
|
||
username = message['username']
|
||
message_text = message['message']
|
||
|
||
try:
|
||
# Отправляем сообщение
|
||
await send_notification_message(chat_id, message_text, username)
|
||
self.channel.basic_ack(method_frame.delivery_tag) # Подтверждаем получение
|
||
except Exception as e:
|
||
logging.error(f"Error processing message: {e}")
|
||
self.channel.basic_nack(method_frame.delivery_tag) # Возвращаем сообщение в очередь
|
||
|
||
except pika.exceptions.AMQPConnectionError as e:
|
||
logging.error(f"RabbitMQ connection error: {e}. Reconnecting in 5 seconds...")
|
||
self.close_connection()
|
||
await asyncio.sleep(5)
|
||
except Exception as e:
|
||
logging.error(f"Unexpected error in RabbitMQ consumer: {e}")
|
||
await asyncio.sleep(5)
|
||
|
||
def close_connection(self):
|
||
"""Закрывает соединение с RabbitMQ."""
|
||
if self.connection and not self.connection.is_closed:
|
||
self.connection.close()
|
||
logging.info("RabbitMQ connection closed.")
|
||
|
||
async def run(self):
|
||
"""Запускает основной цикл обработки очереди."""
|
||
try:
|
||
await self.consume_from_queue()
|
||
except asyncio.CancelledError:
|
||
logging.info("RabbitMQ consumer stopped.")
|
||
finally:
|
||
self.close_connection()
|