diff --git a/.dockerignore b/.dockerignore index 30ec5c3..bb2e957 100644 --- a/.dockerignore +++ b/.dockerignore @@ -8,4 +8,4 @@ /telezab.db /db/ /db/telezab.db - +/trash/ diff --git a/rabbitmq_worker.py b/rabbitmq_worker.py new file mode 100644 index 0000000..43b3c36 --- /dev/null +++ b/rabbitmq_worker.py @@ -0,0 +1,90 @@ +import os +import asyncio +import logging +import json +import time +import pika +from concurrent.futures import ThreadPoolExecutor +from functools import partial + +# RabbitMQ configuration +RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost') +RABBITMQ_QUEUE = 'telegram_notifications' +RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN') +RABBITMQ_PASS = os.getenv('RABBITMQ_PASS') + +# Импорт функций для отправки сообщений +from telezab import send_notification_message # Замените на актуальный путь к send_notification_message + + +class RabbitMQWorker: + def __init__(self): + self.connection = None + self.channel = None + + def rabbitmq_connection(self): + """Устанавливает подключение к RabbitMQ.""" + try: + credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS) + parameters = pika.ConnectionParameters( + host=RABBITMQ_HOST, + credentials=credentials, + heartbeat=600, + blocked_connection_timeout=300 + ) + self.connection = pika.BlockingConnection(parameters) + self.channel = self.connection.channel() + self.channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) + logging.info("RabbitMQ connection established.") + except Exception as e: + logging.error(f"Error establishing RabbitMQ connection: {e}") + self.connection = None + self.channel = None + + async def consume_from_queue(self): + """Основной цикл обработки сообщений из RabbitMQ.""" + while True: + try: + if not self.connection or self.connection.is_closed: + self.rabbitmq_connection() + + for method_frame, properties, body in self.channel.consume(RABBITMQ_QUEUE, inactivity_timeout=5): + if not method_frame: + continue # Нет новых сообщений, продолжаем ждать + + # Декодируем сообщение из очереди + message = json.loads(body) + chat_id = message['chat_id'] + username = message['username'] + message_text = message['message'] + + try: + # Отправляем сообщение + await send_notification_message(chat_id, message_text, username) + self.channel.basic_ack(method_frame.delivery_tag) # Подтверждаем получение + except Exception as e: + logging.error(f"Error processing message: {e}") + self.channel.basic_nack(method_frame.delivery_tag) # Возвращаем сообщение в очередь + + except pika.exceptions.AMQPConnectionError as e: + logging.error(f"RabbitMQ connection error: {e}. Reconnecting in 5 seconds...") + self.close_connection() + await asyncio.sleep(5) + except Exception as e: + logging.error(f"Unexpected error in RabbitMQ consumer: {e}") + await asyncio.sleep(5) + + def close_connection(self): + """Закрывает соединение с RabbitMQ.""" + if self.connection and not self.connection.is_closed: + self.connection.close() + logging.info("RabbitMQ connection closed.") + + async def run(self): + """Запускает основной цикл обработки очереди.""" + try: + await self.consume_from_queue() + except asyncio.CancelledError: + logging.info("RabbitMQ consumer stopped.") + finally: + self.close_connection() diff --git a/requirements.txt b/requirements.txt index 46b2ccc..aeb851b 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/telezab.py b/telezab.py index f0e7157..66cff1e 100644 --- a/telezab.py +++ b/telezab.py @@ -17,6 +17,7 @@ import time import asyncio import aiohttp import pika +import aio_pika import json from concurrent.futures import ThreadPoolExecutor from pyzabbix import ZabbixAPI @@ -27,35 +28,13 @@ import re import urllib.parse from log_manager import LogManager +# from rabbitmq_worker import RABBITMQ_LOGIN from region_api import RegionAPI from user_state_manager import UserStateManager # Load environment variables load_dotenv() -# Функция для загрузки значения из файла -def load_value_from_file(file_name): - try: - with open(file_name, 'r') as file: - return file.read().strip() - except FileNotFoundError: - return None - -# Функция для получения переменной из окружения или файла -# def get_variable_value(variable_name): -# # Попытка получить значение из окружения -# value = os.getenv(variable_name) -# -# # Если переменная окружения не установлена, попробуем загрузить из файла -# if not value: -# file_value = "file_" + variable_name -# value = os.getenv(file_value) -# with open(value, 'r') as file: -# value = file.read() -# return value -# return value - -# DEV = os.getenv('DEV') # Загрузка переменных окружения или значений из файлов TOKEN = os.getenv('TELEGRAM_TOKEN') @@ -70,6 +49,7 @@ RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost') RABBITMQ_QUEUE = 'telegram_notifications' RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN') RABBITMQ_PASS = os.getenv('RABBITMQ_PASS') +RABBITMQ_URL = "amqp://"+ RABBITMQ_LOGIN + ":" + RABBITMQ_PASS + "@" + RABBITMQ_HOST +"/" # Замените на ваш URL RabbitMQ # Инициализируем класс RegionApi region_api = RegionAPI(DB_PATH) # Инициализируем класс UserStateManager @@ -101,64 +81,74 @@ rate_limit_semaphore = asyncio.Semaphore(25) # 25 messages per second def init_db(): try: + # 1️⃣ Проверяем и создаём каталог, если его нет + db_dir = os.path.dirname(DB_PATH) + if not os.path.exists(db_dir): + os.makedirs(db_dir, exist_ok=True) # Создаём каталог рекурсивно + + # 2️⃣ Проверяем, существует ли файл базы данных + db_exists = os.path.exists(DB_PATH) + + # 3️⃣ Открываем соединение, если файла нет, он создастся автоматически with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() - # Create events table - cursor.execute('''CREATE TABLE IF NOT EXISTS events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - hash TEXT UNIQUE, - data TEXT, - delivered BOOLEAN)''') + # 4️⃣ Если базы не было, создаём таблицы + if not db_exists: + cursor.execute('''CREATE TABLE events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + hash TEXT UNIQUE, + data TEXT, + delivered BOOLEAN)''') - # Create subscriptions table with username and active flag - cursor.execute('''CREATE TABLE IF NOT EXISTS subscriptions ( - chat_id INTEGER, - region_id TEXT, - username TEXT, - active BOOLEAN DEFAULT TRUE, - skip BOOLEAN DEFAULT FALSE, - disaster_only BOOLEAN DEFAULT FALSE, - UNIQUE(chat_id, region_id))''') + cursor.execute('''CREATE TABLE subscriptions ( + chat_id INTEGER, + region_id TEXT, + username TEXT, + active BOOLEAN DEFAULT TRUE, + skip BOOLEAN DEFAULT FALSE, + disaster_only BOOLEAN DEFAULT FALSE, + UNIQUE(chat_id, region_id))''') - # Create whitelist table - cursor.execute('''CREATE TABLE IF NOT EXISTS whitelist ( - chat_id INTEGER PRIMARY KEY, - username TEXT, - user_email TEXT)''') + cursor.execute('''CREATE TABLE whitelist ( + chat_id INTEGER PRIMARY KEY, + username TEXT, + user_email TEXT)''') - # Create whitelist table - cursor.execute('''CREATE TABLE IF NOT EXISTS admins ( - chat_id INTEGER PRIMARY KEY, - username TEXT)''') - # Create regions table with active flag - cursor.execute('''CREATE TABLE IF NOT EXISTS regions ( - region_id TEXT PRIMARY KEY, - region_name TEXT, - active BOOLEAN DEFAULT TRUE)''') + cursor.execute('''CREATE TABLE admins ( + chat_id INTEGER PRIMARY KEY, + username TEXT)''') - # Create user events table for logging - cursor.execute('''CREATE TABLE IF NOT EXISTS user_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - chat_id INTEGER, - username TEXT, - action TEXT, - timestamp TEXT)''') + cursor.execute('''CREATE TABLE regions ( + region_id TEXT PRIMARY KEY, + region_name TEXT, + active BOOLEAN DEFAULT TRUE)''') - # Insert sample regions - cursor.execute('''INSERT OR IGNORE INTO regions (region_id, region_name) VALUES - ('01', 'Адыгея'), - ('02', 'Башкортостан (Уфа)'), - ('04', 'Алтай'), - ('19', 'Республика Хакасия')''') + cursor.execute('''CREATE TABLE user_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + chat_id INTEGER, + username TEXT, + action TEXT, + timestamp TEXT)''') + + # Добавляем тестовые данные (если их нет) + cursor.execute('''INSERT OR IGNORE INTO regions (region_id, region_name) VALUES + ('01', 'Адыгея'), + ('02', 'Башкортостан (Уфа)'), + ('04', 'Алтай'), + ('19', 'Республика Хакасия')''') + + conn.commit() + app.logger.info("✅ Database created and initialized successfully.") + else: + app.logger.info("✅ Database already exists. Skipping initialization.") - conn.commit() - app.logger.info("Database initialized successfully.") except Exception as e: - app.logger.error(f"Error initializing database: {e}") + app.logger.error(f"❌ Error initializing database: {e}") finally: - conn.close() + if 'conn' in locals(): # Проверяем, была ли создана переменная conn + conn.close() @@ -855,24 +845,77 @@ def send_to_queue(message): connection.close() +# async def consume_from_queue(): +# connection, channel = rabbitmq_connection() +# +# for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE): +# message = json.loads(body) +# chat_id = message['chat_id'] +# username = message['username'] +# message_text = message['message'] +# +# try: +# await send_notification_message(chat_id, message_text, username) +# channel.basic_ack(method_frame.delivery_tag) +# except Exception as e: +# telebot.logger.error(f"Error sending message from queue: {e}") +# # Optionally, you can nack the message to requeue it +# # channel.basic_nack(method_frame.delivery_tag) +# +# connection.close() + + async def consume_from_queue(): - connection, channel = rabbitmq_connection() - - for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE): - message = json.loads(body) - chat_id = message['chat_id'] - username = message['username'] - message_text = message['message'] - + while True: # Бесконечный цикл для переподключения try: - await send_notification_message(chat_id, message_text, username) - channel.basic_ack(method_frame.delivery_tag) - except Exception as e: - telebot.logger.error(f"Error sending message from queue: {e}") - # Optionally, you can nack the message to requeue it - # channel.basic_nack(method_frame.delivery_tag) + # Подключение к RabbitMQ + connection = await aio_pika.connect_robust(RABBITMQ_URL) + async with connection: + # Открываем канал + channel = await connection.channel() + # Объявляем очередь (если нужно) + queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True) - connection.close() + # Потребляем сообщения + async for message in queue: + async with message.process(): # Авто подтверждение сообщения + try: + # Парсинг сообщения + data = json.loads(message.body) + + # Проверка структуры сообщения + if not isinstance(data, dict): + raise ValueError("Invalid message format: Expected a dictionary") + + # Извлечение необходимых данных + chat_id = data.get("chat_id") + username = data.get("username") + message_text = data.get("message") + + # Проверка обязательных полей + if not all([chat_id, username, message_text]): + raise ValueError(f"Missing required fields in message: {data}") + + # Отправляем сообщение + await send_notification_message(chat_id, message_text, username) + except json.JSONDecodeError: + # Логируем некорректный JSON + telebot.logger.error(f"Failed to decode message: {message.body}") + except ValueError as ve: + # Логируем ошибку формата сообщения + telebot.logger.error(f"Invalid message: {ve}") + except Exception as e: + # Логируем общую ошибку при обработке + telebot.logger.error(f"Error sending message from queue: {e}") + except aio_pika.exceptions.AMQPError as e: + # Логируем ошибку RabbitMQ и переподключаемся + telebot.logger.error(f"RabbitMQ error: {e}") + except Exception as e: + # Логируем общую ошибку и ждем перед переподключением + telebot.logger.error(f"Critical error in consume_from_queue: {e}") + finally: + # Задержка перед переподключением + await asyncio.sleep(5) async def send_message(chat_id, message, is_notification=False): @@ -892,6 +935,9 @@ async def send_message(chat_id, message, is_notification=False): telebot.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...") await asyncio.sleep(1) await send_message(chat_id, message, is_notification) + elif "403" in str(e): + telebot.logger.warning(f"Can't send message to user because bot blocked by user with chat id: {chat_id}") + pass else: telebot.logger.error(f"Failed to send message to {chat_id}: {e}") telebot.logger.error(f"Detailed Error: {e}", exc_info=True) # Добавлено логирование исключения @@ -902,13 +948,14 @@ async def send_message(chat_id, message, is_notification=False): finally: if is_notification: rate_limit_semaphore.release() - + formatted_message = message.replace('\n', ' ').replace('\r', '') + telebot.logger.info(f'Send notification to {chat_id} from RabbitMQ [{formatted_message}]') async def send_notification_message(chat_id, message, username): await send_message(chat_id, message, is_notification=True) - formatted_message = message.replace('\n', ' ').replace('\r', '') - telebot.logger.info(f'Send notification to {username} {chat_id} from RabbitMQ [{formatted_message}]') + # formatted_message = message.replace('\n', ' ').replace('\r', '') + # telebot.logger.info(f'Send notification to {username} {chat_id} from RabbitMQ [{formatted_message}]') async def run_in_executor(func, *args): @@ -1309,10 +1356,10 @@ def webhook(): return jsonify({"status": "error", "message": "Invalid host format"}), 400 app.logger.debug(f"Извлечён номер региона: {region_id}") - # Запрос подписчиков для отправки уведомления в зависимости от уровня опасности - if data['severity'] == '5': # Авария + # Запрос подписчиков для отправки уведомления в зависимости от уровня критичности + if data['severity'] == 'Disaster': # Авария query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE' - else: # Высокая опасность + else: # Высокая критичность query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE AND disaster_only = FALSE' app.logger.debug(f"Выполнение запроса: {query} для region_id={region_id}") @@ -1334,7 +1381,8 @@ def webhook(): # Отправляем сообщения подписчикам for chat_id, username in results: formatted_message = message.replace('\n',' ').replace('\r','') - app.logger.info(f"Отправка сообщения пользователю @{username} (chat_id={chat_id}) [{formatted_message}]") + + app.logger.info(f"Формирование сообщения для пользователя {username} (chat_id={chat_id}) [{formatted_message}]") try: send_to_queue({'chat_id': chat_id, 'username': username, 'message': message}) app.logger.debug(f"Сообщение поставлено в очередь для {chat_id} (@{username})") @@ -1377,13 +1425,13 @@ def format_message(data): 'Disaster': '⛔️' } priority = priority_map.get(data['severity']) - + msg = escape_telegram_chars(data['msg']) if data['status'].upper() == "PROBLEM": message = ( f"{priority} {data['host']} ({data['ip']})\n" - f"Описание: {data['msg']}\n" + f"Описание: {msg}\n" f"Критичность: {data['severity']}\n" - f"Время возникновения проблемы: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n" + f"Время возникновения: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n" ) if 'link' in data: message += f'URL: Ссылка на график' @@ -1391,10 +1439,10 @@ def format_message(data): else: message = ( f"✅ {data['host']} ({data['ip']})\n" - f"Описание: {data['msg']}\n" + f"Описание: {msg}\n" f"Критичность: {data['severity']}\n" f"Проблема устранена!\n" - f"Время устранения проблемы: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n" + f"Время устранения: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n" ) if 'link' in data: message += f'URL: Ссылка на график' @@ -1760,6 +1808,7 @@ def main(): Thread(target=run_flask, daemon=True).start() Thread(target=run_polling, daemon=True).start() # Запуск асинхронных задач + asyncio.run(consume_from_queue()) diff --git a/zabbix_manager.py b/zabbix_manager.py new file mode 100644 index 0000000..c2d0974 --- /dev/null +++ b/zabbix_manager.py @@ -0,0 +1,116 @@ +import logging +import asyncio +from pyzabbix import ZabbixAPI +from datetime import datetime +from pytz import timezone +import os + + + +class ZabbixTriggerManager: + def __init__(self): + self.zabbix_url = os.getenv('ZABBIX_URL') # URL Zabbix из переменных окружения + self.api_token = os.getenv('ZABBIX_API_TOKEN') # API токен Zabbix из переменных окружения + self.moskva_tz = timezone('Europe/Moscow') # Часовой пояс + self.priority_map = {'4': 'HIGH', '5': 'DISASTER'} # Отображение приоритетов + self.zapi = None # Клиент Zabbix API + + def login(self) -> None: + """ + Авторизация в Zabbix API. + """ + try: + self.zapi = ZabbixAPI(self.zabbix_url) + self.zapi.login(api_token=self.api_token) + logging.info("Successfully logged in to Zabbix API.") + except Exception as e: + logging.error(f"Error logging in to Zabbix API: {e}") + raise + + def get_problems_for_group(self, group_id: str) -> list: + """ + Получает проблемы для указанной группы хостов. + """ + try: + logging.info(f"Fetching problems for group {group_id}") + problems = self.zapi.problem.get( + output=["eventid", "name", "severity", "clock"], + groupids=group_id, + recent=1, # Только текущие проблемы + # sortfield=["clock"], + # sortorder="DESC" + ) + logging.info(f"Found {len(problems)} problems for group {group_id}") + return problems + except Exception as e: + logging.error(f"Error fetching problems for group {group_id}: {e}") + return [] + + def get_problems_for_region(self, region_id: str) -> list: + """ + Получает проблемы для всех групп, связанных с регионом. + """ + try: + logging.info(f"Fetching host groups for region {region_id}") + host_groups = self.zapi.hostgroup.get(output=["groupid", "name"], search={"name": region_id}) + filtered_groups = [ + group for group in host_groups + if 'test' not in group['name'].lower() and f'_{region_id}' in group['name'] + ] + + all_problems = [] + for group in filtered_groups: + problems = self.get_problems_for_group(group['groupid']) + all_problems.extend(problems) + + return all_problems + except Exception as e: + logging.error(f"Error fetching problems for region {region_id}: {e}") + return [] + + async def send_problems_to_user(self, chat_id: int, problems: list, bot) -> None: + """ + Отправляет список проблем пользователю в Telegram с учётом задержки для обхода лимита API. + """ + if not problems: + logging.info(f"No problems to send to chat_id {chat_id}.") + bot.send_message(chat_id, "Нет активных проблем.") + return + + for problem in problems: + try: + # Форматируем время + event_time = datetime.fromtimestamp(int(problem['clock']), tz=self.moskva_tz) + event_time_formatted = event_time.strftime('%Y-%m-%d %H:%M:%S Мск') + + # Определяем критичность + severity = self.priority_map.get(problem['severity'], "Неизвестно") + priority_mark = '⚠️' if severity == 'HIGH' else '⛔️' if severity == 'DISASTER' else '' + + # Определяем имя хоста + hosts = self._get_problem_hosts(problem) + host = hosts[0]["name"] if hosts else "Неизвестно" + + # Генерируем URL для графика + item_ids = [item["itemid"] for item in problem.get("items", [])] + url = f"{self.zabbix_url}/history.php?action=batchgraph&" + \ + "&".join([f"itemids[{item_id}]={item_id}" for item_id in item_ids]) + "&graphtype=0" + + # Формируем сообщение + description = self.escape_telegram_chars(problem['name']) + message = ( + f"{priority_mark} Host: {host}\n" + f"Описание: {description}\n" + f"Критичность: {severity}\n" + f"Время создания: {event_time_formatted}\n" + f'URL: Ссылка на график' + ) + + # Отправляем сообщение + bot.send_message(chat_id, message, parse_mode="HTML") + + # Задержка для соблюдения рейтлимита + await asyncio.sleep(0.04) # 40 мс = максимум 25 сообщений в секунду + except Exception as e: + logging.error(f"Error sending problem to chat_id {chat_id}: {e}") +