From 5ccd21ab18045c9064c1933c4eaa4ef188b36a55 Mon Sep 17 00:00:00 2001 From: UdoChudo Date: Sat, 13 Jul 2024 22:18:16 +0500 Subject: [PATCH] Add RabbitMQ for guaranted delivery and rate_limit for message to avoid bans from Telegram Api --- telezab.py | 109 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 96 insertions(+), 13 deletions(-) diff --git a/telezab.py b/telezab.py index f353c23..8249db6 100644 --- a/telezab.py +++ b/telezab.py @@ -8,6 +8,11 @@ from logging.config import dictConfig from threading import Thread, Lock, Timer import sqlite3 import time +import asyncio +import aiohttp +import pika +import json +from concurrent.futures import ThreadPoolExecutor # Load environment variables load_dotenv() @@ -50,6 +55,9 @@ bot = telebot.TeleBot(TOKEN) # Lock for database operations db_lock = Lock() +# Semaphore for rate limiting +rate_limit_semaphore = asyncio.Semaphore(25) # 25 messages per second + # Define states NOTIFICATION_MODE = 1 SETTINGS_MODE = 2 @@ -512,8 +520,6 @@ def process_add_region(message): conn.close() except (IndexError, ValueError): bot.send_message(chat_id, "Неверный формат. Используйте: ") - # Remove this line to avoid repetitive settings menu message - # show_settings_menu(chat_id) @bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith("reactivate_")) def handle_region_action(call): @@ -636,6 +642,83 @@ def handle_active_regions(message): bot.send_message(chat_id, f"Активные регионы:\n{regions_list}") show_settings_menu(chat_id) +# RabbitMQ configuration +RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost') +RABBITMQ_QUEUE = 'telegram_notifications' + +def rabbitmq_connection(): + connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST)) + channel = connection.channel() + channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) + return connection, channel + +def send_to_queue(message): + connection, channel = rabbitmq_connection() + channel.basic_publish( + exchange='', + routing_key=RABBITMQ_QUEUE, + body=json.dumps(message), + properties=pika.BasicProperties( + delivery_mode=2, # make message persistent + )) + connection.close() + +async def consume_from_queue(): + connection, channel = rabbitmq_connection() + + for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE): + message = json.loads(body) + chat_id = message['chat_id'] + message_text = message['message'] + + try: + await send_notification_message(chat_id, message_text) + channel.basic_ack(method_frame.delivery_tag) + except Exception as e: + app.logger.error(f"Error sending message from queue: {e}") + # Optionally, you can nack the message to requeue it + # channel.basic_nack(method_frame.delivery_tag) + + connection.close() + +async def send_message(chat_id, message, is_notification=False): + try: + if is_notification: + await rate_limit_semaphore.acquire() + await run_in_executor(bot.send_message, chat_id, message) + except telebot.apihelper.ApiTelegramException as e: + if "429" in str(e): + app.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...") + await asyncio.sleep(1) + await send_message(chat_id, message, is_notification) + else: + app.logger.error(f"Failed to send message to {chat_id}: {e}") + except Exception as e: + app.logger.error(f"Error sending message to {chat_id}: {e}") + await check_telegram_api() + finally: + if is_notification: + rate_limit_semaphore.release() + +async def send_notification_message(chat_id, message): + await send_message(chat_id, message, is_notification=True) + +async def run_in_executor(func, *args): + loop = asyncio.get_event_loop() + with ThreadPoolExecutor() as pool: + return await loop.run_in_executor(pool, func, *args) + +async def check_telegram_api(): + try: + async with aiohttp.ClientSession() as session: + async with session.get('https://api.telegram.org') as response: + if response.status == 200: + app.logger.info("Telegram API is reachable.") + else: + app.logger.error("Telegram API is not reachable.") + except Exception as e: + app.logger.error(f"Error checking Telegram API: {e}") + @app.route('/webhook', methods=['POST']) def webhook(): data = request.get_json() @@ -673,14 +756,9 @@ def webhook(): if region_active: message = format_message(data) for chat_id, username in results: - try: - app.logger.debug(f"Sending message: {message} to chat_id={chat_id}, username={username}") - bot.send_message(chat_id, message) - app.logger.info(f"Sent alert to {chat_id} ({username}) for region {region_id}") - except telebot.apihelper.ApiTelegramException as e: - app.logger.error(f"Failed to send message to {chat_id} ({username}): {e}") - except Exception as e: - app.logger.error(f"Error sending message to {chat_id} ({username}): {e}") + app.logger.debug(f"Queueing message: {message} to chat_id={chat_id}, username={username}") + send_to_queue({'chat_id': chat_id, 'message': message}) + app.logger.info(f"Queued alert for {chat_id} ({username}) for region {region_id}") conn.commit() conn.close() @@ -701,7 +779,12 @@ if __name__ == '__main__': init_db() # Start Flask app in a separate thread - Thread(target=app.run, kwargs={'port': 5000, 'debug': True, 'use_reloader': False}, daemon=True).start() + Thread(target=app.run, kwargs={'port': 5000, 'host': '0.0.0.0', 'debug': True, 'use_reloader': False}, daemon=True).start() - # Start bot polling - run_polling() + # Start bot polling in a separate thread + Thread(target=run_polling, daemon=True).start() + + # Start async message consumer + loop = asyncio.get_event_loop() + loop.create_task(consume_from_queue()) + loop.run_forever()