Add RabbitMQ for guaranted delivery and rate_limit for message to avoid bans from Telegram Api

This commit is contained in:
Udo Chudo 2024-07-13 22:18:16 +05:00
parent 9f25be7ad9
commit 5ccd21ab18

View File

@ -8,6 +8,11 @@ from logging.config import dictConfig
from threading import Thread, Lock, Timer from threading import Thread, Lock, Timer
import sqlite3 import sqlite3
import time import time
import asyncio
import aiohttp
import pika
import json
from concurrent.futures import ThreadPoolExecutor
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
@ -50,6 +55,9 @@ bot = telebot.TeleBot(TOKEN)
# Lock for database operations # Lock for database operations
db_lock = Lock() db_lock = Lock()
# Semaphore for rate limiting
rate_limit_semaphore = asyncio.Semaphore(25) # 25 messages per second
# Define states # Define states
NOTIFICATION_MODE = 1 NOTIFICATION_MODE = 1
SETTINGS_MODE = 2 SETTINGS_MODE = 2
@ -512,8 +520,6 @@ def process_add_region(message):
conn.close() conn.close()
except (IndexError, ValueError): except (IndexError, ValueError):
bot.send_message(chat_id, "Неверный формат. Используйте: <region_id> <region_name>") bot.send_message(chat_id, "Неверный формат. Используйте: <region_id> <region_name>")
# Remove this line to avoid repetitive settings menu message
# show_settings_menu(chat_id)
@bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith("reactivate_")) @bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith("reactivate_"))
def handle_region_action(call): def handle_region_action(call):
@ -636,6 +642,83 @@ def handle_active_regions(message):
bot.send_message(chat_id, f"Активные регионы:\n{regions_list}") bot.send_message(chat_id, f"Активные регионы:\n{regions_list}")
show_settings_menu(chat_id) show_settings_menu(chat_id)
# RabbitMQ configuration
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
RABBITMQ_QUEUE = 'telegram_notifications'
def rabbitmq_connection():
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
return connection, channel
def send_to_queue(message):
connection, channel = rabbitmq_connection()
channel.basic_publish(
exchange='',
routing_key=RABBITMQ_QUEUE,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
connection.close()
async def consume_from_queue():
connection, channel = rabbitmq_connection()
for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE):
message = json.loads(body)
chat_id = message['chat_id']
message_text = message['message']
try:
await send_notification_message(chat_id, message_text)
channel.basic_ack(method_frame.delivery_tag)
except Exception as e:
app.logger.error(f"Error sending message from queue: {e}")
# Optionally, you can nack the message to requeue it
# channel.basic_nack(method_frame.delivery_tag)
connection.close()
async def send_message(chat_id, message, is_notification=False):
try:
if is_notification:
await rate_limit_semaphore.acquire()
await run_in_executor(bot.send_message, chat_id, message)
except telebot.apihelper.ApiTelegramException as e:
if "429" in str(e):
app.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...")
await asyncio.sleep(1)
await send_message(chat_id, message, is_notification)
else:
app.logger.error(f"Failed to send message to {chat_id}: {e}")
except Exception as e:
app.logger.error(f"Error sending message to {chat_id}: {e}")
await check_telegram_api()
finally:
if is_notification:
rate_limit_semaphore.release()
async def send_notification_message(chat_id, message):
await send_message(chat_id, message, is_notification=True)
async def run_in_executor(func, *args):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
return await loop.run_in_executor(pool, func, *args)
async def check_telegram_api():
try:
async with aiohttp.ClientSession() as session:
async with session.get('https://api.telegram.org') as response:
if response.status == 200:
app.logger.info("Telegram API is reachable.")
else:
app.logger.error("Telegram API is not reachable.")
except Exception as e:
app.logger.error(f"Error checking Telegram API: {e}")
@app.route('/webhook', methods=['POST']) @app.route('/webhook', methods=['POST'])
def webhook(): def webhook():
data = request.get_json() data = request.get_json()
@ -673,14 +756,9 @@ def webhook():
if region_active: if region_active:
message = format_message(data) message = format_message(data)
for chat_id, username in results: for chat_id, username in results:
try: app.logger.debug(f"Queueing message: {message} to chat_id={chat_id}, username={username}")
app.logger.debug(f"Sending message: {message} to chat_id={chat_id}, username={username}") send_to_queue({'chat_id': chat_id, 'message': message})
bot.send_message(chat_id, message) app.logger.info(f"Queued alert for {chat_id} ({username}) for region {region_id}")
app.logger.info(f"Sent alert to {chat_id} ({username}) for region {region_id}")
except telebot.apihelper.ApiTelegramException as e:
app.logger.error(f"Failed to send message to {chat_id} ({username}): {e}")
except Exception as e:
app.logger.error(f"Error sending message to {chat_id} ({username}): {e}")
conn.commit() conn.commit()
conn.close() conn.close()
@ -701,7 +779,12 @@ if __name__ == '__main__':
init_db() init_db()
# Start Flask app in a separate thread # Start Flask app in a separate thread
Thread(target=app.run, kwargs={'port': 5000, 'debug': True, 'use_reloader': False}, daemon=True).start() Thread(target=app.run, kwargs={'port': 5000, 'host': '0.0.0.0', 'debug': True, 'use_reloader': False}, daemon=True).start()
# Start bot polling # Start bot polling in a separate thread
run_polling() Thread(target=run_polling, daemon=True).start()
# Start async message consumer
loop = asyncio.get_event_loop()
loop.create_task(consume_from_queue())
loop.run_forever()