import os from functools import partial from gc import callbacks from flask import Flask, request, jsonify, render_template import schedule from dotenv import load_dotenv import hashlib import telebot from telebot import types import logging from logging.config import dictConfig import zipfile from threading import Thread, Lock import sqlite3 import time import asyncio import aiohttp import pika import aio_pika import json from concurrent.futures import ThreadPoolExecutor from pyzabbix import ZabbixAPI import requests from pytz import timezone from datetime import datetime, timedelta import re import urllib.parse from log_manager import LogManager # from rabbitmq_worker import RABBITMQ_LOGIN from region_api import RegionAPI from user_state_manager import UserStateManager # Load environment variables load_dotenv() DEV = os.getenv('DEV') # Загрузка переменных окружения или значений из файлов TOKEN = os.getenv('TELEGRAM_TOKEN') ZABBIX_API_TOKEN = os.getenv('ZABBIX_API_TOKEN') ZABBIX_URL = os.getenv('ZABBIX_URL') DB_PATH = 'db/telezab.db' SUPPORT_EMAIL = "shiftsupport-rtmis@rtmis.ru" BASE_URL = '/telezab' # RabbitMQ configuration RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost') RABBITMQ_QUEUE = 'telegram_notifications' RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN') RABBITMQ_PASS = os.getenv('RABBITMQ_PASS') RABBITMQ_URL = "amqp://"+ RABBITMQ_LOGIN + ":" + RABBITMQ_PASS + "@" + RABBITMQ_HOST +"/" # Замените на ваш URL RabbitMQ # Инициализируем класс RegionApi region_api = RegionAPI(DB_PATH) # Инициализируем класс UserStateManager user_state_manager = UserStateManager() # Initialize Flask application app = Flask(__name__,static_url_path='/static', template_folder='templates') # Инициализация LogManager log_manager = LogManager(log_dir='logs', retention_days=30) # Настройка уровня логирования для Flask app.logger.setLevel(logging.INFO) # Настройка pyTelegramBotAPI logger telebot.logger = logging.getLogger('telebot') # Важно: вызов schedule_log_rotation для планировки ротации и архивации логов log_manager.schedule_log_rotation() bot = telebot.TeleBot(TOKEN) # Lock for database operations db_lock = Lock() # Semaphore for rate limiting rate_limit_semaphore = asyncio.Semaphore(25) # 25 messages per second def init_db(): try: # 1️⃣ Проверяем и создаём каталог, если его нет db_dir = os.path.dirname(DB_PATH) if not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) # Создаём каталог рекурсивно # 2️⃣ Проверяем, существует ли файл базы данных db_exists = os.path.exists(DB_PATH) # 3️⃣ Открываем соединение, если файла нет, он создастся автоматически with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # 4️⃣ Если базы не было, создаём таблицы if not db_exists: cursor.execute('''CREATE TABLE events ( id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT UNIQUE, data TEXT, delivered BOOLEAN)''') cursor.execute('''CREATE TABLE subscriptions ( chat_id INTEGER, region_id TEXT, username TEXT, active BOOLEAN DEFAULT TRUE, skip BOOLEAN DEFAULT FALSE, disaster_only BOOLEAN DEFAULT FALSE, UNIQUE(chat_id, region_id))''') cursor.execute('''CREATE TABLE whitelist ( chat_id INTEGER PRIMARY KEY, username TEXT, user_email TEXT)''') cursor.execute('''CREATE TABLE admins ( chat_id INTEGER PRIMARY KEY, username TEXT)''') cursor.execute('''CREATE TABLE regions ( region_id TEXT PRIMARY KEY, region_name TEXT, active BOOLEAN DEFAULT TRUE)''') cursor.execute('''CREATE TABLE user_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER, username TEXT, action TEXT, timestamp TEXT)''') # Добавляем тестовые данные (если их нет) cursor.execute('''INSERT OR IGNORE INTO regions (region_id, region_name) VALUES ('01', 'Адыгея'), ('02', 'Башкортостан (Уфа)'), ('04', 'Алтай'), ('19', 'Республика Хакасия')''') conn.commit() app.logger.info("✅ Database created and initialized successfully.") else: app.logger.info("✅ Database already exists. Skipping initialization.") except Exception as e: app.logger.error(f"❌ Error initializing database: {e}") finally: if 'conn' in locals(): # Проверяем, была ли создана переменная conn conn.close() # Hash the incoming data def hash_data(data): return hashlib.sha256(str(data).encode('utf-8')).hexdigest() # Check if user is in whitelist def is_whitelisted(chat_id): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() query = 'SELECT COUNT(*) FROM whitelist WHERE chat_id = ?' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}") cursor.execute(query, (chat_id,)) count = cursor.fetchone()[0] conn.close() return count > 0 # Add user to whitelist def add_to_whitelist(chat_id, username): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() query = 'INSERT OR IGNORE INTO whitelist (chat_id, username) VALUES (?, ?)' telebot.logger.info(f"Executing query: {query} with chat_id={chat_id}, username={username}") try: cursor.execute(query, (chat_id, username)) conn.commit() except Exception as e: telebot.logger.error(f"Error during add to whitelist: {e}") finally: conn.close() def rundeck_add_to_whitelist(chat_id, username, user_email): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Проверка существования chat_id check_query = 'SELECT COUNT(*) FROM whitelist WHERE chat_id = ?' cursor.execute(check_query, (chat_id,)) count = cursor.fetchone()[0] if count > 0: conn.close() return False # Пользователь уже существует # Вставка нового пользователя insert_query = 'INSERT INTO whitelist (chat_id, username, user_email) VALUES (?, ?, ?)' telebot.logger.info(f"Rundeck executing query: {insert_query} with chat_id={chat_id}, username={username}, email={user_email}") cursor.execute(insert_query, (chat_id, username, user_email)) conn.commit() conn.close() return True # Успешное добавление # Remove user from whitelist def remove_from_whitelist(chat_id): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() query = 'DELETE FROM whitelist WHERE chat_id = ?' telebot.logger.info(f"Executing query: {query} with chat_id={chat_id}") cursor.execute(query, (chat_id,)) conn.commit() conn.close() def get_admins(): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT chat_id FROM admins') admins = cursor.fetchall() admins = [i[0] for i in admins] conn.close() return admins def get_sorted_regions(): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT region_id, region_name FROM regions WHERE active = TRUE') regions = cursor.fetchall() conn.close() # Сортируем регионы по числовому значению region_id regions.sort(key=lambda x: int(x[0])) return regions # Check if region exists def region_exists(region_id): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM regions WHERE region_id = ? AND active = TRUE', (region_id,)) count = cursor.fetchone()[0] conn.close() return count > 0 # Get list of regions a user is subscribed to def get_user_subscribed_regions(chat_id): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT regions.region_id, regions.region_name FROM subscriptions JOIN regions ON subscriptions.region_id = regions.region_id WHERE subscriptions.chat_id = ? AND subscriptions.active = TRUE AND subscriptions.skip = FALSE ORDER BY regions.region_id ''', (chat_id,)) regions = cursor.fetchall() conn.close() # Сортируем регионы по числовому значению region_id regions.sort(key=lambda x: int(x[0])) return regions # Check if user is subscribed to a region def is_subscribed(chat_id, region_id): with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute(''' SELECT COUNT(*) FROM subscriptions WHERE chat_id = ? AND region_id = ? AND active = TRUE AND skip = FALSE ''', (chat_id, region_id)) count = cursor.fetchone()[0] conn.close() return count > 0 # Format regions list def format_regions_list(regions): return '\n'.join([f"{region_id} - {region_name}" for region_id, region_name in regions]) def log_user_event(chat_id, username, action): timestamp = time.strftime('%Y-%m-%d %H:%M:%S') try: with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() query = 'INSERT INTO user_events (chat_id, username, action, timestamp) VALUES (?, ?, ?, ?)' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}, username={username}, action={action}, timestamp={timestamp}") cursor.execute(query, (chat_id, username, action, timestamp)) conn.commit() telebot.logger.info(f"User event logged: {chat_id} ({username}) - {action} at {timestamp}.") except Exception as e: telebot.logger.error(f"Error logging user event: {e}") finally: conn.close() # Define states NOTIFICATION_MODE = 1 SETTINGS_MODE = 2 # Handle /help command to provide instructions @bot.message_handler(commands=['help']) def handle_help(message): chat_id = message.chat.id if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы неавторизованы для использования этого бота.") return help_text = ( '/start - Показать меню бота\n' 'Настройки - Перейти в режим настроек и управления подписками\n' 'Активные события - Получение всех нерешённых событий мониторинга по выбранным сервисам выбранного региона\n' 'Помощь - Описание всех возможностей бота' ) bot.send_message(message.chat.id, help_text, parse_mode="html") show_main_menu(message.chat.id) # Handle /register command for new user registration def handle_register(message): chat_id = message.chat.id username = message.from_user.username if username: username = f"@{username}" else: username = "N/A" text = (f'Для продолжения регистрации необходимо отправить с корпоративного почтового адреса "РТ МИС" письмо на адрес {SUPPORT_EMAIL}\n' f'В теме письма указать "Подтверждение регистрации в телеграм-боте TeleZab".\n' f'В теле письма указать:\n' f'1. ФИО\n' f'2. Ваш Chat ID: {chat_id}\n' f'3. Ваше имя пользователя: {username}') bot.send_message(chat_id,text,parse_mode="HTML") log_user_event(chat_id, username, "Requested registration") # Handle /start command @bot.message_handler(commands=['start']) def handle_start(message): show_main_menu(message.chat.id) def show_main_menu(chat_id): markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True) if is_whitelisted(chat_id): user_state_manager.set_state(chat_id, "MAIN_MENU") markup.add('Настройки', 'Помощь', 'Активные события') else: user_state_manager.set_state(chat_id, "REGISTRATION") markup.add('Регистрация') bot.send_message(chat_id, "Выберите действие:", reply_markup=markup) def create_settings_keyboard(chat_id, admins_list): markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True) # Линия 1: "Подписаться", "Отписаться" markup.row('Подписаться','Отписаться') markup.row('Мои подписки','Режим уведомлений') if DEV == '1': if chat_id in admins_list: markup.row('Активные регионы') markup.row('Добавить регион', 'Удалить регион') markup.row('Назад') return markup # Settings menu for users def show_settings_menu(chat_id): if not is_whitelisted(chat_id): user_state_manager.set_state(chat_id, "REGISTRATION") bot.send_message(chat_id, "Вы неавторизованы для использования этого бота") return admins_list = get_admins() markup = create_settings_keyboard(chat_id, admins_list) bot.send_message(chat_id, "Вы находитесь в режиме настроек. Выберите действие:", reply_markup=markup) # Основной обработчик меню @bot.message_handler(func=lambda message: True) def handle_menu_selection(message): chat_id = message.chat.id text = message.text.strip() username = message.from_user.username # Проверка авторизации if not is_whitelisted(chat_id) and text != 'Регистрация': bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") return # Получаем текущее состояние пользователя current_state = user_state_manager.get_state(chat_id) # Обработка команд в зависимости от состояния if current_state == "MAIN_MENU": handle_main_menu(message, chat_id, text) elif current_state == "REGISTRATION": handle_register(message) elif current_state == "SETTINGS_MENU": handle_settings_menu(message, chat_id, text) elif current_state == "SUBSCRIBE": process_subscription_button(message, chat_id, username) elif current_state == "UNSUBSCRIBE": process_unsubscription_button(message, chat_id, username) # elif current_state == "ADD_REGION": # process_add_region_button(message, chat_id, username) # elif current_state == "REMOVE_REGION": # process_remove_region_button(message, chat_id, username) else: bot.send_message(chat_id, "Команда не распознана.") show_main_menu(chat_id) def handle_main_menu(message, chat_id, text): """Обработка команд в главном меню.""" if text == 'Регистрация': user_state_manager.set_state(chat_id, "REGISTRATION") handle_register(message) elif text == 'Настройки': user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) elif text == 'Помощь': handle_help(message) elif text == 'Активные события': handle_active_triggers(message) else: bot.send_message(chat_id, "Команда не распознана.") show_main_menu(chat_id) def handle_settings_menu(message, chat_id, text): """Обработка команд в меню настроек.""" admins_list = get_admins() if text.lower() == 'подписаться': user_state_manager.set_state(chat_id, "SUBSCRIBE") handle_subscribe_button(message) elif text.lower() == 'отписаться': user_state_manager.set_state(chat_id, "UNSUBSCRIBE") handle_unsubscribe_button(message) elif text.lower() == 'мои подписки': handle_my_subscriptions_button(message) elif text.lower() == 'активные регионы': handle_active_regions_button(message) elif text.lower() == "режим уведомлений": handle_notification_mode_button(message) elif text.lower() == 'добавить регион' and chat_id in admins_list: user_state_manager.set_state(chat_id, "ADD_REGION") handle_region_manager(chat_id, 'add') elif text.lower() == 'удалить регион' and chat_id in admins_list: user_state_manager.set_state(chat_id, "REMOVE_REGION") handle_region_manager(chat_id, 'remove') elif text.lower() == 'назад': user_state_manager.set_state(chat_id, "MAIN_MENU") show_main_menu(chat_id) else: bot.send_message(chat_id, "Команда не распознана.") show_settings_menu(chat_id) def handle_subscribe_button(message): chat_id = message.chat.id if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") return username = message.from_user.username if username: username = f"@{username}" else: username = "N/A" regions_list = format_regions_list(get_sorted_regions()) markup = telebot.types.InlineKeyboardMarkup() markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_action")) bot.send_message(chat_id, f"Отправьте номера регионов через запятую:\n{regions_list}\n",reply_markup=markup) bot.register_next_step_handler_by_chat_id(chat_id, process_subscription_button, chat_id, username) def process_subscription_button(message, chat_id, username): subbed_regions = [] invalid_regions = [] if message.text.lower() == 'отмена': bot.send_message(chat_id, "Действие отменено.") user_state_manager.set_state(chat_id, "SETTINGS_MENU") return show_settings_menu(chat_id) if not all(part.strip().isdigit() for part in message.text.split(',')): markup = telebot.types.InlineKeyboardMarkup() markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_action")) bot.send_message(chat_id, "Неверный формат данных. Введите номер или номера регионов через запятую.", reply_markup=markup) bot.register_next_step_handler_by_chat_id(chat_id, process_subscription_button, chat_id, username) return region_ids = message.text.split(',') valid_region_ids = [region[0] for region in get_sorted_regions()] with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() for region_id in region_ids: region_id = region_id.strip() if region_id not in valid_region_ids: invalid_regions.append(region_id) continue cursor.execute('INSERT OR IGNORE INTO subscriptions (chat_id, region_id, username, active) VALUES (?, ?, ?, TRUE)', (chat_id, region_id, username)) if cursor.rowcount == 0: cursor.execute('UPDATE subscriptions SET active = TRUE WHERE chat_id = ? AND region_id = ?', (chat_id, region_id)) subbed_regions.append(region_id) conn.commit() if len(invalid_regions) > 0: bot.send_message(chat_id, f"Регион с ID {', '.join(invalid_regions)} не существует. Введите корректные номера или 'отмена'.") bot.send_message(chat_id, f"Подписка на регионы: {', '.join(subbed_regions)} оформлена.") log_user_event(chat_id, username, f"Subscribed to regions: {', '.join(subbed_regions)}") user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) def handle_unsubscribe_button(message): chat_id = message.chat.id if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") telebot.logger.info(f"Unauthorized access attempt by {chat_id}") user_state_manager.set_state(chat_id, "REGISTRATION") return show_main_menu(chat_id) username = message.from_user.username if username: username = f"@{username}" else: username = "N/A" # Получаем список подписок пользователя user_regions = get_user_subscribed_regions(chat_id) if not user_regions: bot.send_message(chat_id, "Вы не подписаны ни на один регион.") user_state_manager.set_state(chat_id, "SETTINGS_MENU") return show_settings_menu(chat_id) regions_list = format_regions_list(user_regions) markup = telebot.types.InlineKeyboardMarkup() markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_action")) bot.send_message(chat_id, f"Отправьте номер или номера регионов, от которых хотите отписаться (через запятую):\n{regions_list}\n",reply_markup=markup) bot.register_next_step_handler_by_chat_id(chat_id, process_unsubscription_button, chat_id, username) def process_unsubscription_button(message, chat_id, username): unsubbed_regions = [] invalid_regions = [] markup = telebot.types.InlineKeyboardMarkup() markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_action")) if message.text.lower() == 'отмена': bot.send_message(chat_id, "Действие отменено.") user_state_manager.set_state(chat_id, "SETTINGS_MENU") return show_settings_menu(chat_id) # Проверка, что введённая строка содержит только цифры и запятые if not all(part.strip().isdigit() for part in message.text.split(',')): bot.send_message(chat_id, "Некорректный формат. Введите номера регионов через запятую.", reply_markup=markup) bot.register_next_step_handler_by_chat_id(chat_id, process_unsubscription_button, chat_id, username) return region_ids = message.text.split(',') valid_region_ids = [region[0] for region in get_user_subscribed_regions(chat_id)] with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() for region_id in region_ids: region_id = region_id.strip() if region_id not in valid_region_ids: invalid_regions.append(region_id) continue # Удаление подписки query = 'UPDATE subscriptions SET active = FALSE WHERE chat_id = ? AND region_id = ?' cursor.execute(query, (chat_id, region_id)) unsubbed_regions.append(region_id) conn.commit() if len(invalid_regions) > 0: bot.send_message(chat_id, f"Регион с ID {', '.join(invalid_regions)} не найден в ваших подписках.") bot.send_message(chat_id, f"Отписка от регионов: {', '.join(unsubbed_regions)} выполнена.") log_user_event(chat_id, username, f"Unsubscribed from regions: {', '.join(unsubbed_regions)}") user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) @bot.callback_query_handler(func=lambda call: call.data == "cancel_action") def handle_cancel_action(call): chat_id = call.message.chat.id message_id = call.message.message_id bot.clear_step_handler_by_chat_id(chat_id) bot.send_message(chat_id,f"Действие отменено") bot.edit_message_reply_markup(chat_id,message_id,reply_markup=None) user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) return @bot.callback_query_handler(func=lambda call: call.data == "cancel_active_triggers") def handle_cancel_active_triggers(call): chat_id = call.message.chat.id message_id = call.message.message_id bot.clear_step_handler_by_chat_id(chat_id) bot.send_message(chat_id,f"Действие отменено") bot.edit_message_reply_markup(chat_id,message_id,reply_markup=None) user_state_manager.set_state(chat_id, "MAIN_MENU") show_main_menu(chat_id) return ###################################################################################################################### # Handle admin region management commands ###################################################################################################################### def handle_region_manager(chat_id: int, action: str): if action == 'add': bot.send_message(chat_id, "Введите ID и название региона в формате:\n ") bot.register_next_step_handler_by_chat_id(chat_id, process_add_region) elif action == 'remove': bot.send_message(chat_id, "Введите ID региона, который хотите сделать неактивным") bot.register_next_step_handler_by_chat_id(chat_id, process_remove_region) ###################################################################################################################### # Handle admin region management commands ###################################################################################################################### class RegionManager: def __init__(self, db_path): self.db_path = db_path def add_region(self, region_id: int, region_name: str): with db_lock, sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Проверяем наличие региона cursor.execute('SELECT region_name, active FROM regions WHERE region_id = ?', (region_id,)) result = cursor.fetchone() if result: existing_region_name, active = result if existing_region_name == region_name: # Регион уже существует с этим именем cursor.execute('UPDATE regions SET active = TRUE WHERE region_id = ?', (region_id,)) conn.commit() return "activated", existing_region_name else: return "exists", existing_region_name else: # Добавляем новый регион cursor.execute('INSERT OR IGNORE INTO regions (region_id, region_name) VALUES (?, ?)', (region_id, region_name)) conn.commit() return "added", region_name def remove_region(self, region_id: int): with db_lock, sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # Проверяем, существует ли регион cursor.execute('SELECT COUNT(*) FROM regions WHERE region_id = ?', (region_id,)) count = cursor.fetchone()[0] if count == 0: return False # Регион не найден # Деактивируем регион и обновляем подписки cursor.execute('UPDATE regions SET active = FALSE WHERE region_id = ?', (region_id,)) cursor.execute('UPDATE subscriptions SET active = FALSE WHERE region_id = ? AND active = TRUE', (region_id,)) conn.commit() return True def log_event(self, chat_id: int, username: str, action: str): timestamp = time.strftime('%Y-%m-%d %H:%M:%S') with db_lock, sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() query = 'INSERT INTO user_events (chat_id, username, action, timestamp) VALUES (?, ?, ?, ?)' cursor.execute(query, (chat_id, username, action, timestamp)) conn.commit() region_manager = RegionManager(DB_PATH) def process_add_region(message): chat_id = message.chat.id username = f"@{message.from_user.username}" if message.from_user.username else "N/A" try: parts = message.text.split() if len(parts) < 2: raise ValueError("Неверный формат") region_id, region_name = parts[0], ' '.join(parts[1:]) status, existing_region_name = region_manager.add_region(region_id, region_name) if status == "activated": bot.send_message(chat_id, f"Регион {region_id} - {region_name} активирован.") region_manager.log_event(chat_id, username, f"Admin reactivated region {region_id} - {region_name}") user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) elif status == "added": bot.send_message(chat_id, f"Регион {region_id} - {region_name} добавлен.") region_manager.log_event(chat_id, username, f"Admin added region {region_id} - {region_name}") user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) elif status == "exists": markup = telebot.types.InlineKeyboardMarkup() markup.add(telebot.types.InlineKeyboardButton(text="Заменить", callback_data=f"replace_{region_id}_{urllib.parse.quote(region_name)}")) markup.add( telebot.types.InlineKeyboardButton(text="Активировать старый", callback_data=f"reactivate_{region_id}")) markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_region_{chat_id}")) bot.send_message(chat_id, f"Регион {region_id} уже существует с названием {existing_region_name}. Хотите заменить его или активировать старый регион?", reply_markup=markup) user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) except (IndexError, ValueError): bot.send_message(chat_id, "Неверный формат. Используйте: ") except Exception as e: telebot.logger.error(f"Unexpected error: {e}") bot.send_message(chat_id, "Произошла ошибка при обработке запроса.") @bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith( "reactivate_") or call.data.startswith("cancel_region_")) def handle_region_action(call): parts = call.data.split("_", 2) action = parts[0] region_id = parts[1] region_name = parts[2] if len(parts) > 2 else None chat_id = call.message.chat.id username = f"@{call.message.from_user.username}" if call.message.from_user.username else "N/A" if action == "replace": if region_name: region_name = urllib.parse.unquote(region_name) region_manager.add_region(region_id, region_name) bot.send_message(chat_id, f"Регион {region_id} обновлен до {region_name} и активирован.") region_manager.log_event(chat_id, username, f"Admin replaced and reactivated region {region_id} - {region_name}") telebot.logger.info(f"Admin {username} replaced and reactivated region {region_id} - {region_name}") user_state_manager.set_state(chat_id, "SETTINGS_MENU") elif action == "reactivate": region_manager.add_region(region_id, region_name) bot.send_message(chat_id, f"Регион {region_id} активирован.") region_manager.log_event(chat_id, username, f"Admin reactivated region {region_id} - {region_name}") telebot.logger.info(f"Admin {username} activate {region_id} - {region_name}") user_state_manager.set_state(chat_id, "SETTINGS_MENU") elif action == "cancel_region": bot.send_message(chat_id, "Действие отменено.") telebot.logger.info(f"Admin {username} canceled region actions.") user_state_manager.set_state(chat_id, "SETTINGS_MENU") bot.edit_message_reply_markup(chat_id=chat_id, message_id=call.message.message_id, reply_markup=None) bot.answer_callback_query(call.id) return show_settings_menu(chat_id) def process_remove_region(message): chat_id = message.chat.id username = f"@{message.from_user.username}" if message.from_user.username else "N/A" try: region_id = message.text.split()[0] success = region_manager.remove_region(region_id) if success: bot.send_message(chat_id, f"Регион {region_id} теперь неактивен, и все активные подписки обновлены.") region_manager.log_event(chat_id, username, f"Admin {username} deactivated region {region_id}") telebot.logger.info(f"Admin {username} deactivated region {region_id}") user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) else: bot.send_message(chat_id, f"Регион с ID {region_id} не существует.") user_state_manager.set_state(chat_id, "SETTINGS_MENU") show_settings_menu(chat_id) except IndexError: bot.send_message(chat_id, "Неверный формат. Используйте: ") ###################################################################################################################### ## ## ###################################################################################################################### # Handle displaying active subscriptions for a user def handle_my_subscriptions_button(message): chat_id = message.chat.id username = f"@{message.from_user.username}" if message.from_user.username else "N/A" if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") telebot.logger.info(f"Unauthorized access attempt by {username} {chat_id}") return user_regions = get_user_subscribed_regions(chat_id) if not user_regions: bot.send_message(chat_id, "Вы не подписаны ни на один регион.") telebot.logger.debug(f"Запрашиваем {user_regions} for {username} {chat_id}") else: user_regions.sort(key=lambda x: int(x[0])) # Сортировка по числовому значению region_id regions_list = format_regions_list(user_regions) bot.send_message(chat_id, f"Ваши активные подписки:\n{regions_list}") telebot.logger.debug(f"Запрашиваем {user_regions} for {username} {chat_id}") show_settings_menu(chat_id) # Handle displaying all active regions def handle_active_regions_button(message): chat_id = message.chat.id username = f"@{message.from_user.username}" if message.from_user.username else "N/A" if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") telebot.logger.info(f"Unauthorized access attempt by {username} {chat_id}") return regions = get_sorted_regions() # Используем функцию для получения отсортированных регионов if not regions: bot.send_message(chat_id, "Нет активных регионов.") else: regions_list = format_regions_list(regions) bot.send_message(chat_id, f"Активные регионы:\n{regions_list}") show_settings_menu(chat_id) def rabbitmq_connection(): # Создаем объект учетных данных credentials = pika.PlainCredentials(RABBITMQ_LOGIN,RABBITMQ_PASS) # Указываем параметры подключения, включая учетные данные parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, credentials=credentials, # Передаем учетные данные heartbeat=600, # Интервал heartbeat для поддержания соединения blocked_connection_timeout=300 # Таймаут блокировки соединения ) # Создаем подключение и канал connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) return connection, channel def send_to_queue(message): connection, channel = rabbitmq_connection() channel.basic_publish( exchange='', routing_key=RABBITMQ_QUEUE, body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) connection.close() # async def consume_from_queue(): # connection, channel = rabbitmq_connection() # # for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE): # message = json.loads(body) # chat_id = message['chat_id'] # username = message['username'] # message_text = message['message'] # # try: # await send_notification_message(chat_id, message_text, username) # channel.basic_ack(method_frame.delivery_tag) # except Exception as e: # telebot.logger.error(f"Error sending message from queue: {e}") # # Optionally, you can nack the message to requeue it # # channel.basic_nack(method_frame.delivery_tag) # # connection.close() async def consume_from_queue(): while True: # Бесконечный цикл для переподключения try: # Подключение к RabbitMQ connection = await aio_pika.connect_robust(RABBITMQ_URL) async with connection: # Открываем канал channel = await connection.channel() # Объявляем очередь (если нужно) queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True) # Потребляем сообщения async for message in queue: async with message.process(): # Авто подтверждение сообщения try: # Парсинг сообщения data = json.loads(message.body) # Проверка структуры сообщения if not isinstance(data, dict): raise ValueError("Invalid message format: Expected a dictionary") # Извлечение необходимых данных chat_id = data.get("chat_id") username = data.get("username") message_text = data.get("message") # Проверка обязательных полей if not all([chat_id, username, message_text]): raise ValueError(f"Missing required fields in message: {data}") # Отправляем сообщение await send_notification_message(chat_id, message_text, username) except json.JSONDecodeError: # Логируем некорректный JSON telebot.logger.error(f"Failed to decode message: {message.body}") except ValueError as ve: # Логируем ошибку формата сообщения telebot.logger.error(f"Invalid message: {ve}") except Exception as e: # Логируем общую ошибку при обработке telebot.logger.error(f"Error sending message from queue: {e}") except aio_pika.exceptions.AMQPError as e: # Логируем ошибку RabbitMQ и переподключаемся telebot.logger.error(f"RabbitMQ error: {e}") except Exception as e: # Логируем общую ошибку и ждем перед переподключением telebot.logger.error(f"Critical error in consume_from_queue: {e}") finally: # Задержка перед переподключением await asyncio.sleep(5) async def send_message(chat_id, message, is_notification=False): try: if is_notification: await rate_limit_semaphore.acquire() parse_mode = 'HTML' # Используем partial для передачи именованных аргументов в bot.send_message func_with_args = partial(bot.send_message, chat_id=chat_id, text=message, parse_mode=parse_mode) # Передаем подготовленную функцию в run_in_executor await run_in_executor(func_with_args) except telebot.apihelper.ApiTelegramException as e: if "429" in str(e): telebot.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...") await asyncio.sleep(1) await send_message(chat_id, message, is_notification) elif "403" in str(e): telebot.logger.warning(f"Can't send message to user because bot blocked by user with chat id: {chat_id}") pass else: telebot.logger.error(f"Failed to send message to {chat_id}: {e}") telebot.logger.error(f"Detailed Error: {e}", exc_info=True) # Добавлено логирование исключения except Exception as e: username = f"@{message.from_user.username}" if message.from_user.username else "N/A" telebot.logger.error(f"Unexpected error while sending message to {username} {chat_id}: {e}", exc_info=True) await check_telegram_api() finally: if is_notification: rate_limit_semaphore.release() formatted_message = message.replace('\n', ' ').replace('\r', '') telebot.logger.info(f'Send notification to {chat_id} from RabbitMQ [{formatted_message}]') async def send_notification_message(chat_id, message, username): await send_message(chat_id, message, is_notification=True) # formatted_message = message.replace('\n', ' ').replace('\r', '') # telebot.logger.info(f'Send notification to {username} {chat_id} from RabbitMQ [{formatted_message}]') async def run_in_executor(func, *args): loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: return await loop.run_in_executor(pool, func, *args) async def check_telegram_api(): try: async with aiohttp.ClientSession() as session: async with session.get('https://api.telegram.org') as response: if response.status == 200: telebot.logger.info("Telegram API is reachable.") else: telebot.logger.error("Telegram API is not reachable.") except Exception as e: telebot.logger.error(f"Error checking Telegram API: {e}") def extract_region_number(host): # Используем регулярное выражение для извлечения цифр после первого символа и до первой буквы match = re.match(r'^.\d+', host) if match: return match.group(0)[1:] # Возвращаем строку без первого символа return None def handle_notification_mode_button(message): chat_id = message.chat.id username = f"@{message.from_user.username}" if message.from_user.username else "N/A" telebot.logger.debug(f"Handling notification mode button for user {username} ({chat_id}).") if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы неавторизованы для использования этого бота") telebot.logger.warning(f"Unauthorized access attempt by {username} ({chat_id})") return # Логируем успешное авторизованное использование бота telebot.logger.info(f"User {username} ({chat_id}) is authorized and is selecting a notification mode.") # Отправляем клавиатуру выбора режима уведомлений markup = types.InlineKeyboardMarkup() markup.add(types.InlineKeyboardButton(text="Критические события", callback_data="notification_mode_disaster")) markup.add(types.InlineKeyboardButton(text="Все события", callback_data="notification_mode_all")) bot.send_message(chat_id, "Выберите уровень событий мониторинга, уведомление о которых хотите получать:\n" '1. Критические события (приоритет "DISASTER") - события, являющиеся потенциальными авариями и требующие оперативного решения.\nВ Zabbix обязательно имеют тег "CALL" для оперативного привлечения инженеров к устранению.\n\n' '2. Все события (По умолчанию) - критические события, а также события Zabbix высокого ("HIGH") приоритета, имеющие потенциально значительное влияние на сервис и требующее устранение в плановом порядке.', reply_markup=markup, parse_mode="HTML") telebot.logger.info(f"Sent notification mode selection message to {username} ({chat_id}).") @bot.callback_query_handler(func=lambda call: call.data.startswith("notification_mode_")) def handle_notification_mode_selection(call): chat_id = call.message.chat.id message_id = call.message.message_id mode = call.data.split("_")[2] telebot.logger.debug(f"User ({chat_id}) selected notification mode: {mode}.") # Убираем клавиатуру bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=None) telebot.logger.debug(f"Removed inline keyboard for user ({chat_id}).") # Обновляем режим уведомлений disaster_only = True if mode == "disaster" else False try: telebot.logger.debug(f"Attempting to update notification mode in the database for user {chat_id}.") with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() query = 'UPDATE subscriptions SET disaster_only = ? WHERE chat_id = ?' cursor.execute(query, (disaster_only, chat_id)) conn.commit() mode_text = "Критические события" if disaster_only else "Все события" bot.send_message(chat_id, f"Режим уведомлений успешно изменён на: {mode_text}") telebot.logger.info(f"Notification mode for user ({chat_id}) updated to: {mode_text}") # Логируем изменение состояния пользователя user_state_manager.set_state(chat_id, "SETTINGS_MENU") telebot.logger.debug(f"User state for {chat_id} set to SETTINGS_MENU.") # Показываем меню настроек show_settings_menu(chat_id) telebot.logger.debug(f"Displayed settings menu to {chat_id}.") except Exception as e: telebot.logger.error(f"Error updating notification mode for {chat_id}: {e}") bot.send_message(chat_id, "Произошла ошибка при изменении режима уведомлений.") finally: conn.close() telebot.logger.debug(f"Database connection closed for user {chat_id}.") # Логируем успешный ответ callback-запроса bot.answer_callback_query(call.id) telebot.logger.debug(f"Callback query for user ({chat_id}) answered.") # Фаза 1: Запрос активных событий и выбор региона с постраничным переключением def handle_active_triggers(message): chat_id = message.chat.id regions = get_sorted_regions() # Используем функцию get_regions для получения регионов start_index = 0 markup = create_region_keyboard(regions, start_index) bot.send_message(chat_id, "Выберите регион для получения активных событий:", reply_markup=markup) def create_region_keyboard(regions, start_index, regions_per_page=10): markup = types.InlineKeyboardMarkup() end_index = min(start_index + regions_per_page, len(regions)) # Создаём кнопки для регионов for i in range(start_index, end_index): region_id, region_name = regions[i] button = types.InlineKeyboardButton(text=f"{region_id}: {region_name}", callback_data=f"region_{region_id}") markup.add(button) # Добавляем кнопки для переключения страниц navigation_row = [] if start_index > 0: navigation_row.append(types.InlineKeyboardButton(text="<", callback_data=f"prev_{start_index}")) if end_index < len(regions): navigation_row.append(types.InlineKeyboardButton(text=">", callback_data=f"next_{end_index}")) if navigation_row: markup.row(*navigation_row) markup.row(types.InlineKeyboardButton(text='Отмена', callback_data='cancel_active_triggers')) return markup @bot.callback_query_handler( func=lambda call: call.data.startswith("region_") or call.data.startswith("prev_") or call.data.startswith( "next_")) def handle_region_pagination(call): chat_id = call.message.chat.id message_id = call.message.message_id data = call.data regions = get_sorted_regions() # Используем функцию get_regions для получения регионов regions_per_page = 10 # Если был выбран регион, то убираем клавиатуру и продолжаем выполнение функции if data.startswith("region_"): region_id = data.split("_")[1] bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=None) handle_region_selection(call, region_id) # Продолжаем выполнение функции после выбора региона # Если была нажата кнопка для переключения страниц elif data.startswith("prev_") or data.startswith("next_"): direction, index = data.split("_") index = int(index) # Рассчитываем новый индекс страницы start_index = max(0, index - regions_per_page) if direction == "prev" else min(len(regions) - regions_per_page, index) # Обновляем клавиатуру для новой страницы markup = create_region_keyboard(regions, start_index, regions_per_page) bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=markup) bot.answer_callback_query(call.id) # Фаза 2: Обработка выбора региона и предложить выбор группы def handle_region_selection(call, region_id): chat_id = call.message.chat.id try: # Получаем группы хостов для выбранного региона zapi = ZabbixAPI(ZABBIX_URL) zapi.login(api_token=ZABBIX_API_TOKEN) host_groups = zapi.hostgroup.get(output=["groupid", "name"], search={"name": region_id}) filtered_groups = [group for group in host_groups if 'test' not in group['name'].lower() and f'_{region_id}' in group['name']] # Если нет групп if not filtered_groups: bot.send_message(chat_id, "Нет групп хостов для этого региона.") show_main_menu(chat_id) return # Создаем клавиатуру с выбором группы или всех групп markup = types.InlineKeyboardMarkup() for group in filtered_groups: markup.add(types.InlineKeyboardButton(text=group['name'], callback_data=f"group_{group['groupid']}")) markup.add(types.InlineKeyboardButton(text="Все группы региона\n(Долгое выполнение)", callback_data=f"all_groups_{region_id}")) bot.send_message(chat_id, "Выберите группу хостов или получите события по всем группам региона:", reply_markup=markup) except Exception as e: bot.send_message(chat_id, f"Ошибка при подключении к Zabbix API.\n{str(e)}") show_main_menu(chat_id) # Фаза 3: Обработка выбора группы/всех групп и запрос периода @bot.callback_query_handler(func=lambda call: call.data.startswith("group_") or call.data.startswith("all_groups_")) def handle_group_or_all_groups(call): chat_id = call.message.chat.id message_id = call.message.message_id # Убираем клавиатуру после выбора группы bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=None) # Если выбрана конкретная группа if call.data.startswith("group_"): group_id = call.data.split("_")[1] get_triggers_for_group(chat_id, group_id) # Сразу получаем события для группы show_main_menu(chat_id) # Если выбраны все группы региона elif call.data.startswith("all_groups_"): region_id = call.data.split("_")[2] get_triggers_for_all_groups(chat_id, region_id) # Сразу получаем события для всех групп региона show_main_menu(chat_id) # Вспомогательная функция: получение событий для группы def get_triggers_for_group(chat_id, group_id): triggers = get_zabbix_triggers(group_id) # Получаем все активные события без периода if not triggers: bot.send_message(chat_id, f"Нет активных событий.") show_main_menu(chat_id) else: send_triggers_to_user(triggers, chat_id) def get_triggers_for_all_groups(chat_id, region_id): try: zapi = ZabbixAPI(ZABBIX_URL) zapi.login(api_token=ZABBIX_API_TOKEN) host_groups = zapi.hostgroup.get(output=["groupid", "name"], search={"name": region_id}) filtered_groups = [group for group in host_groups if 'test' not in group['name'].lower()] all_triggers = [] for group in filtered_groups: triggers = get_zabbix_triggers(group['groupid']) if triggers: all_triggers.extend(triggers) if all_triggers: send_triggers_to_user(all_triggers, chat_id) else: bot.send_message(chat_id, f"Нет активных событий.") show_main_menu(chat_id) except Exception as e: bot.send_message(chat_id, f"Ошибка при получении событий.\n{str(e)}") show_main_menu(chat_id) # Вспомогательная функция: отправка событий пользователю def send_triggers_to_user(triggers, chat_id): for trigger in triggers: bot.send_message(chat_id, trigger, parse_mode="html") time.sleep(1 / 5) def escape_telegram_chars(text): """ Экранирует запрещённые символы для Telegram API: < -> < > -> > & -> & Также проверяет на наличие запрещённых HTML-тегов и другие проблемы с форматированием. """ replacements = { '&': '&', '<': '<', '>': '>', '"': '"', # Для кавычек } # Применяем замены for char, replacement in replacements.items(): text = text.replace(char, replacement) return text def extract_host_from_name(name): match = re.match(r"^(.*?)\s*->", name) return match.group(1) if match else "Неизвестный хост" def get_zabbix_triggers(group_id): try: zapi = ZabbixAPI(ZABBIX_URL) zapi.login(api_token=ZABBIX_API_TOKEN) telebot.logger.info(f"Fetching active hosts for group {group_id}") # Получаем список активных хостов в группе active_hosts = zapi.host.get( groupids=group_id, output=["hostid", "name"], filter={"status": "0"} # Только включенные хосты ) if not active_hosts: telebot.logger.info(f"No active hosts found for group {group_id}") return [] host_ids = [host["hostid"] for host in active_hosts] telebot.logger.info(f"Found {len(host_ids)} active hosts in group {group_id}") # Получение активных проблем для этих хостов problems = zapi.problem.get( output=["eventid", "name", "severity", "clock"], hostids=host_ids, suppressed=0, acknowledged=0, filter={"severity": ["4", "5"]}, # Только высокий и аварийный уровень sortorder="ASC" ) if not problems: telebot.logger.info(f"No active problems found for group {group_id}") return [] # Получение IP-адресов хостов host_interfaces = zapi.hostinterface.get( hostids=host_ids, output=["hostid", "ip"] ) host_ip_map = {iface["hostid"]: iface["ip"] for iface in host_interfaces} print(host_ip_map) moscow_tz = timezone('Europe/Moscow') severity_map = {'4': 'HIGH', '5': 'DISASTER'} priority_map = {'4': '⚠️', '5': '⛔️'} problem_messages = [] for problem in problems: event_time_epoch = int(problem['clock']) event_time = datetime.fromtimestamp(event_time_epoch, tz=moscow_tz) event_time_formatted = event_time.strftime('%Y-%m-%d %H:%M:%S Мск') severity = severity_map.get(problem['severity'], 'Неизвестно') priority = priority_map.get(problem['severity'], '') description = problem.get('name', 'Нет описания') # Получаем хост из описания (или по-другому, если известно) host = extract_host_from_name(description) host_ip = host_ip_map.get(problem.get("hostid"), "Неизвестный IP") message = (f"{priority} Host: {host}\n" f"IP: {host_ip}\n" f"Описание: {description}\n" f"Критичность: {severity}\n" f"Время создания: {event_time_formatted}") problem_messages.append(message) return problem_messages except Exception as e: telebot.logger.error(f"Error fetching problems for group {group_id}: {e}") return None @app.route(BASE_URL + '/webhook', methods=['POST']) def webhook(): try: # Получаем данные и логируем data = request.get_json() app.logger.info(f"Получены данные: {data}") # Генерация хеша события и логирование event_hash = hash_data(data) app.logger.debug(f"Сгенерирован хеш для события: {event_hash}") # Работа с базой данных в блоке синхронизации with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Проверяем количество записей в таблице событий cursor.execute('SELECT COUNT(*) FROM events') count = cursor.fetchone()[0] app.logger.debug(f"Текущее количество записей в таблице events: {count}") # Если записей >= 200, удаляем самое старое событие if count >= 200: query = 'DELETE FROM events WHERE id = (SELECT MIN(id) FROM events)' app.logger.debug(f"Удаление старого события: {query}") cursor.execute(query) # Извлечение номера региона из поля host region_id = extract_region_number(data.get("host")) if region_id is None: app.logger.error(f"Не удалось извлечь номер региона из host: {data.get('host')}") return jsonify({"status": "error", "message": "Invalid host format"}), 400 app.logger.debug(f"Извлечён номер региона: {region_id}") # Запрос подписчиков для отправки уведомления в зависимости от уровня критичности if data['severity'] == 'Disaster': # Авария query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE' else: # Высокая критичность query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE AND disaster_only = FALSE' app.logger.debug(f"Выполнение запроса: {query} для region_id={region_id}") cursor.execute(query, (region_id,)) results = cursor.fetchall() app.logger.debug(f"Найдено подписчиков: {len(results)} для региона {region_id}") # Проверка статуса региона (активен или нет) query = 'SELECT active FROM regions WHERE region_id = ?' cursor.execute(query, (region_id,)) region_row = cursor.fetchone() if region_row and region_row[0]: # Если регион активен app.logger.debug(f"Регион {region_id} активен. Начинаем рассылку сообщений.") message = format_message(data) undelivered = False # Отправляем сообщения подписчикам for chat_id, username in results: formatted_message = message.replace('\n',' ').replace('\r','') app.logger.info(f"Формирование сообщения для пользователя {username} (chat_id={chat_id}) [{formatted_message}]") try: send_to_queue({'chat_id': chat_id, 'username': username, 'message': message}) app.logger.debug(f"Сообщение поставлено в очередь для {chat_id} (@{username})") except Exception as e: app.logger.error(f"Ошибка при отправке сообщения для {chat_id} (@{username}): {e}") undelivered = True # Сохранение события, если были проблемы с доставкой if undelivered: query = 'INSERT OR IGNORE INTO events (hash, data, delivered) VALUES (?, ?, ?)' app.logger.debug(f"Сохранение события в базе данных: {query} (hash={event_hash}, delivered={False})") cursor.execute(query, (event_hash, str(data), False)) # Коммитим изменения в базе данных conn.commit() app.logger.debug("Изменения в базе данных успешно сохранены.") conn.close() # Возвращаем успешный ответ return jsonify({"status": "success"}), 200 except sqlite3.OperationalError as e: app.logger.error(f"Ошибка операции с базой данных: {e}") return jsonify({"status": "error", "message": "Ошибка работы с базой данных"}), 500 except ValueError as e: app.logger.error(f"Ошибка значения: {e}") return jsonify({"status": "error", "message": "Некорректные данные"}), 400 except Exception as e: app.logger.error(f"Неожиданная ошибка: {e}") return jsonify({"status": "error", "message": "Внутренняя ошибка сервера"}), 500 def format_message(data): try: priority_map = { 'High': '⚠️', 'Disaster': '⛔️' } priority = priority_map.get(data['severity']) msg = escape_telegram_chars(data['msg']) if data['status'].upper() == "PROBLEM": message = ( f"{priority} {data['host']} ({data['ip']})\n" f"Описание: {msg}\n" f"Критичность: {data['severity']}\n" f"Время возникновения: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n" ) if 'link' in data: message += f'URL: Ссылка на график' return message else: message = ( f"✅ {data['host']} ({data['ip']})\n" f"Описание: {msg}\n" f"Критичность: {data['severity']}\n" f"Проблема устранена!\n" f"Время устранения: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n" ) if 'link' in data: message += f'URL: Ссылка на график' return message except KeyError as e: app.logger.error(f"Missing key in data: {e}") raise ValueError(f"Missing key in data: {e}") def validate_chat_id(chat_id): """Validate that chat_id is composed only of digits.""" return chat_id.isdigit() def validate_telegram_id(telegram_id): """Validate that telegram_id starts with '@'.""" return telegram_id.startswith('@') def validate_email(email): """Validate that email domain is '@rtmis.ru'.""" return re.match(r'^[\w.-]+@rtmis\.ru$', email) is not None # Маршрут для добавления пользователя @app.route(BASE_URL + '/users/add', methods=['POST']) def add_user(): data = request.get_json() telegram_id = data.get('telegram_id') chat_id = data.get('chat_id') user_email = data.get('user_email') # DEBUG: Логирование полученных данных app.logger.debug(f"Получены данные для добавления пользователя: {data}") # Валидация данных if not validate_chat_id(chat_id): app.logger.warning(f"Ошибка валидации: некорректный chat_id: {chat_id}") return jsonify({"status": "failure", "reason": "Invalid data chat_id must be digit"}), 400 if not validate_telegram_id(telegram_id): app.logger.warning(f"Ошибка валидации: некорректный telegram_id: {telegram_id}") return jsonify({"status": "failure", "reason": "Invalid data telegram id must start from '@'"}), 400 if not validate_email(user_email): app.logger.warning(f"Ошибка валидации: некорректный email: {user_email}") return jsonify({"status": "failure", "reason": "Invalid data email address must be from rtmis"}), 400 if telegram_id and chat_id and user_email: try: # INFO: Попытка отправить сообщение пользователю app.logger.info(f"Отправка сообщения пользователю {telegram_id} с chat_id {chat_id}") bot.send_message(chat_id, "Регистрация пройдена успешно.") # DEBUG: Попытка добавления пользователя в whitelist app.logger.debug(f"Добавление пользователя {telegram_id} в whitelist") success = rundeck_add_to_whitelist(chat_id, telegram_id, user_email) if success: # INFO: Пользователь успешно добавлен в whitelist app.logger.info(f"Пользователь {telegram_id} добавлен в whitelist.") user_state_manager.set_state(chat_id, "MAIN_MENU") # DEBUG: Показ основного меню пользователю app.logger.debug(f"Отображение основного меню для пользователя с chat_id {chat_id}") show_main_menu(chat_id) return jsonify({"status": "success", "msg": f"User {telegram_id} with {user_email} added successfully"}), 200 else: # INFO: Пользователь уже существует в системе app.logger.info(f"Пользователь с chat_id {chat_id} уже существует.") return jsonify({"status": "failure", "msg": "User already exists"}), 400 except telebot.apihelper.ApiTelegramException as e: if e.result.status_code == 403: # INFO: Пользователь заблокировал бота app.logger.info(f"Пользователь {telegram_id} заблокировал бота") return jsonify({"status": "failure", "msg": f"User {telegram_id} is blocked chat with bot"}) elif e.result.status_code == 400: # WARNING: Пользователь неизвестен боту, возможно не нажал /start app.logger.warning(f"Пользователь {telegram_id} с chat_id {chat_id} неизвестен боту, возможно, не нажал /start") return jsonify({"status": "failure", "msg": f"User {telegram_id} with {chat_id} is unknown to the bot, did the user press /start button?"}) else: # ERROR: Неизвестная ошибка при отправке сообщения app.logger.error(f"Ошибка при отправке сообщения пользователю {telegram_id}: {str(e)}") return jsonify({"status": "failure", "msg": f"{e}"}) else: # ERROR: Ошибка валидации — недостаточно данных app.logger.error("Получены некорректные данные для добавления пользователя.") return jsonify({"status": "failure", "reason": "Invalid data"}), 400 @app.route(BASE_URL + '/users/del', methods=['POST']) def delete_user(): data = request.get_json() user_email = data.get('email') conn = sqlite3.connect(DB_PATH) try: # DEBUG: Получен запрос и начинается обработка app.logger.debug(f"Получен запрос на удаление пользователя. Данные: {data}") if not user_email: # WARNING: Ошибка валидации данных, email отсутствует app.logger.warning(f"Ошибка валидации: отсутствует email") return jsonify({"status": "failure", "message": "Email is required"}), 400 cursor = conn.cursor() # DEBUG: Запрос на получение chat_id app.logger.debug(f"Выполняется запрос на получение chat_id для email: {user_email}") cursor.execute("SELECT chat_id FROM whitelist WHERE user_email = ?", (user_email,)) user = cursor.fetchone() if user is None: # WARNING: Пользователь с указанным email не найден app.logger.warning(f"Пользователь с email {user_email} не найден") return jsonify({"status": "failure", "message": "User not found"}), 404 chat_id = user[0] # INFO: Удаление пользователя и его подписок начато app.logger.info(f"Начато удаление пользователя с email {user_email} и всех его подписок") # DEBUG: Удаление пользователя из whitelist app.logger.debug(f"Удаление пользователя с email {user_email} из whitelist") cursor.execute("DELETE FROM whitelist WHERE user_email = ?", (user_email,)) # DEBUG: Удаление подписок пользователя app.logger.debug(f"Удаление подписок для пользователя с chat_id {chat_id}") cursor.execute("DELETE FROM subscriptions WHERE chat_id = ?", (chat_id,)) conn.commit() # INFO: Пользователь и подписки успешно удалены app.logger.info(f"Пользователь с email {user_email} и все его подписки успешно удалены") return jsonify( {"status": "success", "message": f"User with email {user_email} and all subscriptions deleted."}), 200 except Exception as e: conn.rollback() # ERROR: Ошибка при удалении данных app.logger.error(f"Ошибка при удалении пользователя с email {user_email}: {str(e)}") return jsonify({"status": "failure", "message": str(e)}), 500 finally: conn.close() # DEBUG: Соединение с базой данных закрыто app.logger.debug(f"Соединение с базой данных закрыто") # Маршрут для получения информации о пользователях @app.route(BASE_URL + '/users/get', methods=['GET']) def get_users(): try: # INFO: Запрос на получение списка пользователей app.logger.info("Запрос на получение информации о пользователях получен") with db_lock: conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # DEBUG: Запрос данных из таблицы whitelist app.logger.debug("Запрос данных пользователей из таблицы whitelist") cursor.execute('SELECT * FROM whitelist') users = cursor.fetchall() # DEBUG: Формирование словаря пользователей app.logger.debug("Формирование словаря пользователей") users_dict = {id: {'id': id, 'username': username, 'email': email, 'events': [], 'worker': '', 'subscriptions': []} for id, username, email in users} # DEBUG: Запрос данных событий пользователей app.logger.debug("Запрос событий пользователей из таблицы user_events") cursor.execute('SELECT chat_id, username, action, timestamp FROM user_events') events = cursor.fetchall() # DEBUG: Обработка событий и добавление их в словарь пользователей for chat_id, username, action, timestamp in events: if chat_id in users_dict: event = {'type': action, 'date': timestamp} if "Subscribed to region" in action: region = action.split(": ")[-1] event['region'] = region users_dict[chat_id]['events'].append(event) # DEBUG: Запрос данных подписок пользователей app.logger.debug("Запрос активных подписок пользователей из таблицы subscriptions") cursor.execute('SELECT chat_id, region_id FROM subscriptions WHERE active = 1') subscriptions = cursor.fetchall() # DEBUG: Добавление подписок к пользователям for chat_id, region_id in subscriptions: if chat_id in users_dict: users_dict[chat_id]['subscriptions'].append(str(region_id)) # INFO: Формирование результата app.logger.info("Формирование результата для ответа") result = [] for user in users_dict.values(): ordered_user = { 'email': user['email'], 'username': user['username'], 'id': user['id'], 'worker': user['worker'], 'events': user['events'], 'subscriptions': ', '.join(user['subscriptions']) } result.append(ordered_user) # INFO: Успешная отправка данных пользователей app.logger.info("Информация о пользователях успешно отправлена") return jsonify(result) except Exception as e: # ERROR: Ошибка при получении информации о пользователях app.logger.error(f"Ошибка при получении информации о пользователях: {str(e)}") return jsonify({'status': 'error', 'message': str(e)}), 500 # Маршрут для отображения HTML-страницы с информацией о пользователях @app.route(BASE_URL + '/users', methods=['GET']) def view_users(): return render_template('users.html') # Маршрут для добавления региона @app.route(BASE_URL + '/regions/add', methods=['POST']) def add_region(): data = request.json region_id: int = data.get('region_id') region_name: str = data.get('region_name') if not region_id or not region_name: return jsonify({"status": "error", "message": "Invalid input"}), 400 result = region_api.add_region(region_id, region_name) return jsonify(result) # Маршрут для удаления региона @app.route(BASE_URL + '/regions/del', methods=['POST']) def del_region(): data = request.json region_id = data.get('region_id') if not region_id: return jsonify({"status": "error", "message": "Invalid input"}), 400 result = region_api.remove_region(region_id) return jsonify(result) # Маршрут для получения списка регионов @app.route(BASE_URL + '/regions/get', methods=['GET']) def get_regions(): regions = region_api.get_regions() return jsonify(regions) @app.route(BASE_URL + '/regions/edit', methods=['POST']) def edit_region(): data = request.json region_id = data.get('region_id') active = data.get('active') # Проверка валидности данных if not region_id and active: return jsonify({"status": "error", "message": "Invalid data received"}), 400 # Обновление региона result = region_api.update_region_status(region_id, active) return jsonify(result) # Маршрут для рендеринга страницы управления регионами @app.route(BASE_URL + '/regions', methods=['GET']) def regions_page(): return render_template('regions.html') # Управление уровнями логирования для Flask @app.route(BASE_URL + '/debug/flask', methods=['POST']) def toggle_flask_debug(): try: data = request.get_json() level = data.get('level').upper() if level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']: return jsonify({'status': 'error', 'message': 'Invalid log level'}), 400 log_level = getattr(logging, level, logging.DEBUG) app.logger.setLevel(log_level) for handler in app.logger.handlers: handler.setLevel(log_level) return jsonify({'status': 'success', 'level': level}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 # Управление уровнями логирования для Telebot @app.route(BASE_URL + '/debug/telebot', methods=['POST']) def toggle_telebot_debug(): try: data = request.get_json() level = data.get('level').upper() if level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']: return jsonify({'status': 'error', 'message': 'Invalid log level'}), 400 log_level = getattr(logging, level, logging.DEBUG) telebot.logger.setLevel(log_level) for handler in telebot.logger.handlers: handler.setLevel(log_level) return jsonify({'status': 'success', 'level': level}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 # Test functions for admin def simulate_event(message): chat_id = message.chat.id test_event = { "host": "12", "msg": "Тестовое сообщение", "date_reception": "12757175125", "severity": "5", "tags": "OTC,OFS,NFS", "status": "Авария!" } app.logger.info(f"Simulating event: {test_event}") # Use requests to simulate a POST request response = requests.post('http://localhost:5000/webhook', json=test_event) app.logger.info(f"Response from webhook: {response.status_code} - {response.text}") bot.send_message(chat_id, f"Тестовое событие отправлено. Статус ответа: {response.status_code}") def simulate_triggers(message): chat_id = message.chat.id regions = ["12", "19", "35", "40"] trigger_messages = [] for region_id in regions: triggers = get_zabbix_triggers(region_id) if triggers: trigger_messages.append(f"Регион {region_id}:\n{triggers}") if trigger_messages: bot.send_message(chat_id, "\n\n".join(trigger_messages), parse_mode="html") else: bot.send_message(chat_id, "Нет активных проблем по указанным регионам.") def run_polling(): bot.infinity_polling(timeout=10, long_polling_timeout = 5) # Запуск Flask-приложения def run_flask(): app.run(port=5000, host='0.0.0.0', debug=True, use_reloader=False) # Основная функция для запуска def main(): # Инициализация базы данных init_db() # Запуск Flask и бота в отдельных потоках Thread(target=run_flask, daemon=True).start() Thread(target=run_polling, daemon=True).start() # Запуск асинхронных задач asyncio.run(consume_from_queue()) if __name__ == '__main__': main()