import os from flask import Flask, request, jsonify import schedule from dotenv import load_dotenv import hashlib import telebot import logging from logging.config import dictConfig import zipfile from threading import Thread, Lock, Timer import sqlite3 import time import asyncio import aiohttp import pika import json from concurrent.futures import ThreadPoolExecutor from pyzabbix import ZabbixAPI import requests from pytz import timezone from datetime import datetime, timedelta import re # Load environment variables load_dotenv() class UTF8StreamHandler(logging.StreamHandler): def __init__(self, stream=None): super().__init__(stream) self.setStream(stream) def setStream(self, stream): super().setStream(stream) if hasattr(stream, 'reconfigure'): stream.reconfigure(encoding='utf-8') # Определение пути к основному лог-файлу LOG_FILE = 'logs/app.log' # Определение функции архивирования логов def archive_old_logs(): # Получаем дату предыдущего дня yesterday_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') # Проверяем существует ли основной лог-файл if os.path.exists(LOG_FILE): # Путь к архиву в той же папке archive_name = f"app_{yesterday_date}.zip" archive_path = os.path.join(os.path.dirname(LOG_FILE), archive_name) # Создание архива и добавление лог-файла with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf: zipf.write(LOG_FILE, arcname=os.path.basename(LOG_FILE)) # Удаление старого лог-файла после архивирования os.remove(LOG_FILE) class FilterByMessage(logging.Filter): def filter(self, record): # Фильтруем сообщения, содержащие 'Received 1 new updates' return 'Received ' not in record.getMessage() # Initialize Flask application app = Flask(__name__) # Настройка логирования dictConfig({ 'version': 1, 'formatters': { 'default': { 'format': '[%(asctime)s] %(levelname)s %(module)s: %(message)s', }, }, 'handlers': { 'console': { 'class': 'telezab.UTF8StreamHandler', # Замените на путь к вашему классу UTF8StreamHandler 'stream': 'ext://sys.stdout', # Вывод в консоль 'formatter': 'default', 'filters': ['filter_by_message'] }, 'file': { 'class': 'logging.FileHandler', 'filename': 'app.log', # Запись в файл 'formatter': 'default', 'encoding': 'utf-8', # Кодировка файла }, }, 'filters': { 'filter_by_message': { '()': FilterByMessage, }, }, 'loggers': { 'flask': { 'level': 'DEBUG', 'handlers': ['console', 'file'], 'propagate': False, }, 'telebot': { 'level': 'DEBUG', 'handlers': ['console', 'file'], 'propagate': False, }, }, 'root': { 'level': 'DEBUG', 'handlers': ['console', 'file'], } }) # Настройка уровня логирования для Flask app.logger.setLevel(logging.DEBUG) # Настройка pyTelegramBotAPI logger telebot.logger = logging.getLogger('telebot') # Get the token from environment variables TOKEN = os.getenv('TELEGRAM_TOKEN') ZABBIX_URL = os.getenv('ZABBIX_URL') ZABBIX_API_TOKEN = os.getenv('ZABBIX_API_TOKEN') if not TOKEN or not ZABBIX_URL or not ZABBIX_API_TOKEN: raise ValueError("One or more required environment variables are missing") ADMIN_CHAT_IDS = os.getenv('ADMIN_CHAT_IDS', '').split(',') bot = telebot.TeleBot(TOKEN) # Lock for database operations db_lock = Lock() # Semaphore for rate limiting rate_limit_semaphore = asyncio.Semaphore(25) # 25 messages per second # Define states NOTIFICATION_MODE = 1 SETTINGS_MODE = 2 # Dictionary to keep track of user states and timers user_states = {} user_timers = {} def init_db(): global st st = datetime.now() try: with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() # Create events table cursor.execute('''CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT UNIQUE, data TEXT, delivered BOOLEAN)''') # Create subscriptions table with username and active flag cursor.execute('''CREATE TABLE IF NOT EXISTS subscriptions ( chat_id INTEGER, region_id TEXT, username TEXT, active BOOLEAN DEFAULT TRUE, skip BOOLEAN DEFAULT FALSE, UNIQUE(chat_id, region_id))''') # Create whitelist table cursor.execute('''CREATE TABLE IF NOT EXISTS whitelist ( chat_id INTEGER PRIMARY KEY)''') # Create regions table with active flag cursor.execute('''CREATE TABLE IF NOT EXISTS regions ( region_id TEXT PRIMARY KEY, region_name TEXT, active BOOLEAN DEFAULT TRUE)''') # Create user events table for logging cursor.execute('''CREATE TABLE IF NOT EXISTS user_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, chat_id INTEGER, username TEXT, action TEXT, timestamp TEXT)''') # Insert sample regions cursor.execute('''INSERT OR IGNORE INTO regions (region_id, region_name) VALUES ('01', 'Адыгея'), ('02', 'Башкортостан (Уфа)'), ('04', 'Алтай'), ('19', 'Республика Хакасия')''') conn.commit() app.logger.info("Database initialized successfully.") except Exception as e: app.logger.error(f"Error initializing database: {e}") finally: conn.close() app.logger.info(f"init_db completed in {datetime.now() - st}") # Hash the incoming data def hash_data(data): return hashlib.sha256(str(data).encode('utf-8')).hexdigest() # Check if user is in whitelist def is_whitelisted(chat_id): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() query = 'SELECT COUNT(*) FROM whitelist WHERE chat_id = ?' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}") cursor.execute(query, (chat_id,)) count = cursor.fetchone()[0] conn.close() return count > 0 # Add user to whitelist def add_to_whitelist(chat_id, username): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() query = 'INSERT OR IGNORE INTO whitelist (chat_id, username) VALUES (?, ?)' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}, username={username}") cursor.execute(query, (chat_id, username)) conn.commit() conn.close() # Remove user from whitelist def remove_from_whitelist(chat_id): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() query = 'DELETE FROM whitelist WHERE chat_id = ?' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}") cursor.execute(query, (chat_id,)) conn.commit() conn.close() # Get list of regions def get_regions(): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() cursor.execute('SELECT region_id, region_name FROM regions WHERE active = TRUE ORDER BY region_id') regions = cursor.fetchall() conn.close() return regions def get_sorted_regions(): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() cursor.execute('SELECT region_id, region_name FROM regions WHERE active = TRUE') regions = cursor.fetchall() conn.close() # Сортируем регионы по числовому значению region_id regions.sort(key=lambda x: int(x[0])) return regions # Check if region exists def region_exists(region_id): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM regions WHERE region_id = ? AND active = TRUE', (region_id,)) count = cursor.fetchone()[0] conn.close() return count > 0 # Get list of regions a user is subscribed to def get_user_subscribed_regions(chat_id): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() cursor.execute(''' SELECT regions.region_id, regions.region_name FROM subscriptions JOIN regions ON subscriptions.region_id = regions.region_id WHERE subscriptions.chat_id = ? AND subscriptions.active = TRUE AND subscriptions.skip = FALSE ORDER BY regions.region_id ''', (chat_id,)) regions = cursor.fetchall() conn.close() return regions # Check if user is subscribed to a region def is_subscribed(chat_id, region_id): with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() cursor.execute(''' SELECT COUNT(*) FROM subscriptions WHERE chat_id = ? AND region_id = ? AND active = TRUE AND skip = FALSE ''', (chat_id, region_id)) count = cursor.fetchone()[0] conn.close() return count > 0 # Format regions list def format_regions_list(regions): return '\n'.join([f"{region_id} - {region_name}" for region_id, region_name in regions]) def log_user_event(chat_id, username, action): timestamp = time.strftime('%Y-%m-%d %H:%M:%S') try: with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() query = 'INSERT INTO user_events (chat_id, username, action, timestamp) VALUES (?, ?, ?, ?)' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}, username={username}, action={action}, timestamp={timestamp}") cursor.execute(query, (chat_id, username, action, timestamp)) conn.commit() telebot.logger.info(f"User event logged: {chat_id} ({username}) - {action} at {timestamp}.") except Exception as e: telebot.logger.error(f"Error logging user event: {e}") finally: conn.close() # Handle state transitions def set_user_state(chat_id, state): user_states[chat_id] = state if state == SETTINGS_MODE: start_settings_timer(chat_id) elif state == NOTIFICATION_MODE: cancel_settings_timer(chat_id) def start_settings_timer(chat_id): if chat_id in user_timers: user_timers[chat_id].cancel() timer = Timer(30, transition_to_notification_mode, [chat_id]) user_timers[chat_id] = timer timer.start() def cancel_settings_timer(chat_id): if chat_id in user_timers: user_timers[chat_id].cancel() del user_timers[chat_id] def reset_settings_timer(chat_id): if chat_id in user_timers: user_timers[chat_id].cancel() start_settings_timer(chat_id) def transition_to_notification_mode(chat_id): set_user_state(chat_id, NOTIFICATION_MODE) bot.send_message(chat_id, "Вы были автоматически переведены в режим получения уведомлений.") show_main_menu(chat_id) telebot.logger.info(f"User {chat_id} automatically transitioned to notification mode.") # Main menu for users def show_main_menu(chat_id): markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True) if is_whitelisted(chat_id): markup.add('Настройки', 'Помощь', 'Активные триггеры') else: markup.add('Регистрация') bot.send_message(chat_id, "Выберите действие:", reply_markup=markup) # Settings menu for users def show_settings_menu(chat_id): markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True) if str(chat_id) in ADMIN_CHAT_IDS: markup.add('Подписаться', 'Отписаться', 'Мои подписки', 'Активные регионы', 'Добавить регион', 'Удалить регион', 'Назад') markup.add('Тестовое событие', 'Тест триггеров') else: markup.add('Подписаться', 'Отписаться', 'Мои подписки', 'Активные регионы', 'Назад') bot.send_message(chat_id, "Вы находитесь в режиме настроек. Выберите действие:", reply_markup=markup) # Handle /start command @bot.message_handler(commands=['start']) def handle_start(message): chat_id = message.chat.id username = message.from_user.username if username: username = f"@{username}" else: username = "N/A" set_user_state(chat_id, NOTIFICATION_MODE) show_main_menu(chat_id) telebot.logger.info(f"User {chat_id} ({username}) started with command /start.") # Handle menu button presses @bot.message_handler(func=lambda message: True) def handle_menu_selection(message): chat_id = message.chat.id text = message.text.strip().lower() if user_states.get(chat_id, NOTIFICATION_MODE) == SETTINGS_MODE: reset_settings_timer(chat_id) handle_settings_menu_selection(message) else: if text == 'регистрация': handle_register(message) elif text == 'настройки': set_user_state(chat_id, SETTINGS_MODE) show_settings_menu(chat_id) elif text == 'помощь': handle_help(message) elif text == 'активные триггеры': handle_active_triggers(message) else: bot.send_message(chat_id, "Команда не распознана или у вас нет прав для выполнения этой команды.") show_main_menu(chat_id) # Handle settings menu button presses def handle_settings_menu_selection(message): chat_id = message.chat.id text = message.text.strip().lower() reset_settings_timer(chat_id) if text == 'подписаться': handle_subscribe(message) elif text == 'отписаться': handle_unsubscribe(message) elif text == 'мои подписки': handle_my_subscriptions(message) elif text == 'активные регионы': handle_active_regions(message) elif text == 'добавить регион' and str(chat_id) in ADMIN_CHAT_IDS: prompt_admin_for_region(chat_id, 'add') elif text == 'удалить регион' and str(chat_id) in ADMIN_CHAT_IDS: prompt_admin_for_region(chat_id, 'remove') elif text == 'тестовое событие' and str(chat_id) in ADMIN_CHAT_IDS: simulate_event(message) elif text == 'тест триггеров' and str(chat_id) in ADMIN_CHAT_IDS: simulate_triggers(message) elif text == 'назад': set_user_state(chat_id, NOTIFICATION_MODE) show_main_menu(chat_id) else: bot.send_message(chat_id, "Команда не распознана.") show_settings_menu(chat_id) # Handle /subscribe command to subscribe to a region @bot.message_handler(commands=['subscribe', 'sub']) def handle_subscribe(message): chat_id = message.chat.id if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") telebot.logger.info(f"Unauthorized access attempt by {chat_id}") return username = message.from_user.username if username: username = f"@{username}" else: username = "N/A" regions_list = format_regions_list(get_sorted_regions()) bot.send_message(chat_id, f"Отправьте номер или номера регионов, на которые хотите подписаться (через запятую):\n{regions_list}\n\nНапишите 'отмена' для отмены.") bot.register_next_step_handler_by_chat_id(chat_id, process_subscription, chat_id, username) def process_subscription(message, chat_id, username): if message.text.lower() == 'отмена': bot.send_message(chat_id, "Действие отменено.") return show_settings_menu(chat_id) region_ids = message.text.split(',') valid_region_ids = get_regions() valid_region_ids = [region[0] for region in valid_region_ids] with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() for region_id in region_ids: region_id = region_id.strip() if not region_id.isdigit() or region_id not in valid_region_ids: bot.send_message(chat_id, f"Регион с ID {region_id} не существует или недопустимый формат. Введите только существующие номера регионов.") return show_settings_menu(chat_id) query = 'INSERT OR IGNORE INTO subscriptions (chat_id, region_id, username, active) VALUES (?, ?, ?, TRUE)' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}, region_id={region_id}, username={username}") cursor.execute(query, (chat_id, region_id, username)) if cursor.rowcount == 0: query = 'UPDATE subscriptions SET active = TRUE WHERE chat_id = ? AND region_id = ?' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}, region_id={region_id}") cursor.execute(query, (chat_id, region_id)) conn.commit() conn.close() bot.send_message(chat_id, f"Подписка на регионы: {', '.join(region_ids)} оформлена.") telebot.logger.info(f"User {chat_id} ({username}) subscribed to regions: {', '.join(region_ids)}.") log_user_event(chat_id, username, f"Subscribed to regions: {', '.join(region_ids)}") show_settings_menu(chat_id) # Handle /unsubscribe command to unsubscribe from a region @bot.message_handler(commands=['unsubscribe']) def handle_unsubscribe(message): chat_id = message.chat.id if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") telebot.logger.info(f"Unauthorized access attempt by {chat_id}") return user_regions = get_user_subscribed_regions(chat_id) if not user_regions: bot.send_message(chat_id, "Вы не подписаны ни на один регион.") return show_settings_menu(chat_id) else: regions_list = format_regions_list(user_regions) bot.send_message(chat_id, f"Отправьте номер или номера регионов, от которых хотите отписаться (через запятую):\n{regions_list}\n\nНапишите 'отмена' для отмены.") bot.register_next_step_handler_by_chat_id(chat_id, process_unsubscription, chat_id) # Пример функции, которая использует bot и log def process_unsubscription(message, chat_id): if message.text.lower() == 'отмена': bot.send_message(chat_id, "Действие отменено.") return show_settings_menu(chat_id) region_ids = message.text.split(',') valid_region_ids = get_user_subscribed_regions(chat_id) valid_region_ids = [region[0] for region in valid_region_ids] with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() for region_id in region_ids: region_id = region_id.strip() if not region_id.isdigit() or region_id not in valid_region_ids: bot.send_message(chat_id, f"Регион с ID {region_id} не существует или недопустимый формат. Введите только номера регионов, на которые вы подписаны.") return show_settings_menu(chat_id) query = 'UPDATE subscriptions SET active = FALSE WHERE chat_id = ? AND region_id = ?' telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}, region_id={region_id}") cursor.execute(query, (chat_id, region_id)) conn.commit() conn.close() bot.send_message(chat_id, f"Отписка от регионов: {', '.join(region_ids)} оформлена.") telebot.logger.info(f"User {chat_id} unsubscribed from regions: {', '.join(region_ids)}.") username = "@" + message.from_user.username if message.from_user.username else "N/A" log_user_event(chat_id, username, f"Unsubscribed from regions: {', '.join(region_ids)}") show_settings_menu(chat_id) # Handle /help command to provide instructions @bot.message_handler(commands=['help']) def handle_help(message): help_text = ( "/start - Показать меню бота\n" "Настройки - Перейти в режим настроек и управлять подписками\n" "Помощь - Показать это сообщение" ) bot.send_message(message.chat.id, help_text) show_main_menu(message.chat.id) # Handle /register command for new user registration @bot.message_handler(commands=['register']) def handle_register(message): chat_id = message.chat.id username = message.from_user.username if username: username = f"@{username}" else: username = "N/A" bot.send_message(chat_id, f"Ваш chat ID: {chat_id}, ваше имя пользователя: {username}. Запрос на одобрение отправлен администратору.") log_user_event(chat_id, username, "Requested registration") for admin_chat_id in ADMIN_CHAT_IDS: markup = telebot.types.InlineKeyboardMarkup() markup.add( telebot.types.InlineKeyboardButton(text="Подтвердить", callback_data=f"approve_{chat_id}_{username}"), telebot.types.InlineKeyboardButton(text="Отменить", callback_data=f"decline_{chat_id}_{username}") ) bot.send_message( admin_chat_id, f"Пользователь {username} ({chat_id}) запрашивает регистрацию. Вы подтверждаете это действие?", reply_markup=markup ) @bot.callback_query_handler(func=lambda call: call.data.startswith("approve_") or call.data.startswith("decline_")) def handle_admin_response(call): action, chat_id, username = call.data.split("_") if action == "approve": add_to_whitelist(int(chat_id), username) bot.send_message(chat_id, "Ваша регистрация одобрена администратором.") bot.send_message(call.message.chat.id, f"Пользователь {username} ({chat_id}) добавлен в белый список.") log_user_event(chat_id, username, "Approved registration") elif action == "decline": bot.send_message(chat_id, "Ваша регистрация отклонена администратором.") bot.send_message(call.message.chat.id, f"Пользователь {username} ({chat_id}) отклонен.") log_user_event(chat_id, username, "Declined registration") bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None) bot.answer_callback_query(call.id) def process_register(message, chat_id, username): if message.text.lower() == 'отмена': bot.send_message(chat_id, "Регистрация отменена.") show_main_menu(chat_id) return if message.text.lower() == 'подтвердить регистрацию': for admin_chat_id in ADMIN_CHAT_IDS: markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True) markup.add(f'/add_whitelist {chat_id}', 'Отмена') bot.send_message( admin_chat_id, f"Пользователь {username} ({chat_id}) запрашивает регистрацию.\n" f"Вы подтверждаете это действие?", reply_markup=markup ) bot.send_message(chat_id, "Запрос отправлен администратору для одобрения.") telebot.logger.info(f"User {chat_id} ({username}) requested registration.") else: bot.send_message(chat_id, "Некорректный выбор. Регистрация отменена.") show_main_menu(chat_id) # Handle admin region management commands def prompt_admin_for_region(chat_id, action): if action == 'add': bot.send_message(chat_id, "Введите ID и название региона в формате: ") bot.register_next_step_handler_by_chat_id(chat_id, process_add_region) elif action == 'remove': bot.send_message(chat_id, "Введите ID региона, который хотите сделать неактивным") bot.register_next_step_handler_by_chat_id(chat_id, process_remove_region) def process_add_region(message): chat_id = message.chat.id try: region_id, region_name = message.text.split()[0], ' '.join(message.text.split()[1:]) with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() query = 'SELECT region_name, active FROM regions WHERE region_id = ?' telebot.logger.debug(f"Executing query: {query} with region_id={region_id}") cursor.execute(query, (region_id,)) result = cursor.fetchone() if result: existing_region_name, active = result if existing_region_name == region_name: query = 'UPDATE regions SET active = TRUE WHERE region_id = ?' telebot.logger.debug(f"Executing query: {query} with region_id={region_id}") cursor.execute(query, (region_id,)) bot.send_message(chat_id, f"Регион {region_id} - {region_name} активирован.") telebot.logger.info(f"Admin {chat_id} reactivated region {region_id} - {region_name}.") else: markup = telebot.types.InlineKeyboardMarkup() markup.add(telebot.types.InlineKeyboardButton(text="Заменить", callback_data=f"replace_{region_id}_{region_name}")) markup.add(telebot.types.InlineKeyboardButton(text="Активировать старый", callback_data=f"reactivate_{region_id}")) markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_region_{chat_id}")) bot.send_message(chat_id, f"Регион {region_id} уже существует с названием {existing_region_name}. Хотите заменить его или активировать старый регион?", reply_markup=markup) else: query = 'INSERT OR IGNORE INTO regions (region_id, region_name) VALUES (?, ?)' telebot.logger.debug(f"Executing query: {query} with region_id={region_id}, region_name={region_name}") cursor.execute(query, (region_id, region_name)) bot.send_message(chat_id, f"Регион {region_id} - {region_name} добавлен.") telebot.logger.info(f"Admin {chat_id} added region {region_id} - {region_name}.") conn.commit() conn.close() except (IndexError, ValueError): bot.send_message(chat_id, "Неверный формат. Используйте: ") @bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith("reactivate_") or call.data.startswith("cancel_region_")) def handle_region_action(call): parts = call.data.split("_", 2) action = parts[0] region_id = parts[1] region_name = parts[2] if len(parts) > 2 else None chat_id = call.message.chat.id if not region_name: bot.send_message(chat_id, "Ошибка: Недостаточно данных для выполнения действия.") bot.answer_callback_query(call.id) # Завершение обработки callback bot.delete_message(chat_id, call.message.message_id) return show_settings_menu(chat_id) with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() if action == "replace": query = 'UPDATE regions SET region_name = ?, active = TRUE WHERE region_id = ?' telebot.logger.debug(f"Executing query: {query} with region_name={region_name}, region_id={region_id}") cursor.execute(query, (region_name, region_id)) bot.send_message(chat_id, f"Регион {region_id} обновлен до {region_name} и активирован.") telebot.logger.info(f"Admin {chat_id} replaced and reactivated region {region_id} with {region_name}.") elif action == "reactivate": query = 'UPDATE regions SET active = TRUE WHERE region_id = ?' telebot.logger.debug(f"Executing query: {query} with region_id={region_id}") cursor.execute(query, (region_id,)) bot.send_message(chat_id, f"Регион {region_id} активирован.") telebot.logger.info(f"Admin {chat_id} reactivated region {region_id}.") elif action == "cancel_region": bot.send_message(chat_id, "Действие отменено.") telebot.logger.info(f"Admin {chat_id} canceled region action.") conn.commit() conn.close() bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None) show_settings_menu(chat_id) bot.answer_callback_query(call.id) # Завершение обработки callback def process_remove_region(message): chat_id = message.chat.id try: region_id = message.text.split()[0] with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() # Проверка существования региона query = 'SELECT COUNT(*) FROM regions WHERE region_id = ?' cursor.execute(query, (region_id,)) count = cursor.fetchone()[0] if count == 0: bot.send_message(chat_id, f"Регион с ID {region_id} не существует.") return show_settings_menu(chat_id) query = 'UPDATE regions SET active = FALSE WHERE region_id = ?' telebot.logger.debug(f"Executing query: {query} with region_id={region_id}") cursor.execute(query, (region_id,)) query = 'UPDATE subscriptions SET active = FALSE WHERE region_id = ? AND active = TRUE' telebot.logger.debug(f"Executing query: {query} with region_id={region_id}") cursor.execute(query, (region_id,)) conn.commit() conn.close() bot.send_message(chat_id, f"Регион {region_id} теперь неактивен и все активные подписки обновлены.") telebot.logger.info(f"Admin {chat_id} set region {region_id} to inactive and updated subscriptions.") except IndexError: bot.send_message(chat_id, "Неверный формат. Используйте: ") show_settings_menu(chat_id) # Handle displaying active subscriptions for a user def handle_my_subscriptions(message): chat_id = message.chat.id if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") telebot.logger.info(f"Unauthorized access attempt by {chat_id}") return user_regions = get_user_subscribed_regions(chat_id) if not user_regions: bot.send_message(chat_id, "Вы не подписаны ни на один регион.") else: user_regions.sort(key=lambda x: int(x[0])) # Сортировка по числовому значению region_id regions_list = format_regions_list(user_regions) bot.send_message(chat_id, f"Ваши активные подписки:\n{regions_list}") show_settings_menu(chat_id) # Handle displaying all active regions def handle_active_regions(message): chat_id = message.chat.id if not is_whitelisted(chat_id): bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.") telebot.logger.info(f"Unauthorized access attempt by {chat_id}") return regions = get_sorted_regions() # Используем функцию для получения отсортированных регионов if not regions: bot.send_message(chat_id, "Нет активных регионов.") else: regions_list = format_regions_list(regions) bot.send_message(chat_id, f"Активные регионы:\n{regions_list}") show_settings_menu(chat_id) # RabbitMQ configuration RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost') RABBITMQ_QUEUE = 'telegram_notifications' def rabbitmq_connection(): connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST)) channel = connection.channel() channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) return connection, channel def send_to_queue(message): connection, channel = rabbitmq_connection() channel.basic_publish( exchange='', routing_key=RABBITMQ_QUEUE, body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) connection.close() async def consume_from_queue(): connection, channel = rabbitmq_connection() for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE): message = json.loads(body) chat_id = message['chat_id'] message_text = message['message'] try: await send_notification_message(chat_id, message_text) channel.basic_ack(method_frame.delivery_tag) except Exception as e: telebot.logger.error(f"Error sending message from queue: {e}") # Optionally, you can nack the message to requeue it # channel.basic_nack(method_frame.delivery_tag) connection.close() async def send_message(chat_id, message, is_notification=False): try: if is_notification: await rate_limit_semaphore.acquire() await run_in_executor(bot.send_message, chat_id, message) except telebot.apihelper.ApiTelegramException as e: if "429" in str(e): telebot.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...") await asyncio.sleep(1) await send_message(chat_id, message, is_notification) else: telebot.logger.error(f"Failed to send message to {chat_id}: {e}") except Exception as e: telebot.logger.error(f"Error sending message to {chat_id}: {e}") await check_telegram_api() finally: if is_notification: rate_limit_semaphore.release() async def send_notification_message(chat_id, message): await send_message(chat_id, message, is_notification=True) async def run_in_executor(func, *args): loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: return await loop.run_in_executor(pool, func, *args) async def check_telegram_api(): try: async with aiohttp.ClientSession() as session: async with session.get('https://api.telegram.org') as response: if response.status == 200: telebot.logger.info("Telegram API is reachable.") else: telebot.logger.error("Telegram API is not reachable.") except Exception as e: telebot.logger.error(f"Error checking Telegram API: {e}") def extract_region_number(host): # Используем регулярное выражение для извлечения цифр после первого символа и до первой буквы match = re.match(r'^.\d+', host) if match: return match.group(0)[1:] # Возвращаем строку без первого символа return None @app.route('/webhook', methods=['POST']) def webhook(): try: data = request.get_json() app.logger.info(f"Received data: {data}") event_hash = hash_data(data) with db_lock: conn = sqlite3.connect('telezab.db') cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM events') count = cursor.fetchone()[0] if count >= 200: query = 'DELETE FROM events WHERE id = (SELECT MIN(id) FROM events)' app.logger.debug(f"Executing query: {query}") cursor.execute(query) # Извлечение номера региона из поля host region_id = extract_region_number(data.get("host")) if region_id is None: app.logger.error(f"Failed to extract region number from host: {data.get('host')}") return jsonify({"status": "error", "message": "Invalid host format"}), 400 # Fetch chat_ids to send the alert query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE AND skip = FALSE' app.logger.debug(f"Executing query: {query} with region_id={region_id}") cursor.execute(query, (region_id,)) results = cursor.fetchall() # Check if the region is active query = 'SELECT active FROM regions WHERE region_id = ?' cursor.execute(query, (region_id,)) region_row = cursor.fetchone() if region_row and region_row[0]: # Check if region_row is not None and if active message = format_message(data) undelivered = False for chat_id, username in results: app.logger.debug(f"Queueing message: {message} to chat_id={chat_id}, username={username}") try: send_to_queue({'chat_id': chat_id, 'message': message}) app.logger.info(f"Queued alert for {chat_id} ({username}) for region {region_id}") except Exception as e: app.logger.error(f"Failed to send message to {chat_id} ({username}): {e}") undelivered = True if undelivered: query = 'INSERT OR IGNORE INTO events (hash, data, delivered) VALUES (?, ?, ?)' app.logger.debug(f"Executing query: {query} with hash={event_hash}, data={data}, delivered={False}") cursor.execute(query, (event_hash, str(data), False)) conn.commit() conn.close() return jsonify({"status": "success"}), 200 except sqlite3.OperationalError as e: app.logger.error(f"Database operation error: {e}") return jsonify({"status": "error", "message": "Database operation error"}), 500 except ValueError as e: app.logger.error(f"Value error: {e}") return jsonify({"status": "error", "message": "Invalid data"}), 400 except Exception as e: app.logger.error(f"Unexpected error: {e}") return jsonify({"status": "error", "message": "Internal server error"}), 500 def format_message(data): try: status_emoji = "⚠️" if data['status'].upper() == "PROBLEM" else "✅" priority_map = { '4': 'Высокая', '5': 'Авария' } priority = priority_map.get(data['severity'], 'Неизвестно') if data['status'].upper() == "PROBLEM": message = ( f"⚠️ {data['host']} ({data["ip"]})\n" f"{data['msg']}\n" f"Критичность: {data['severity']}" ) if 'link' in data: message += f'\nURL: {data['link']}' return message else: message = ( f"✅ {data['host']} ({data["ip"]})\n" f"{data['msg']}\n" f"Критичность: {data['severity']}\n" f"Проблема устранена!\n" f"Время устранения проблемы: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))}" ) return message except KeyError as e: app.logger.error(f"Missing key in data: {e}") raise ValueError(f"Missing key in data: {e}") @app.route('/add_user', methods=['POST']) def add_user(): data = request.get_json() telegram_id = data.get('telegram_id') chat_id = data.get('chat_id') if telegram_id and chat_id: add_to_whitelist(chat_id, telegram_id) app.logger.info(f"User {telegram_id} added to whitelist.") return jsonify({"status": "success"}), 200 else: app.logger.error("Invalid data received for adding user.") return jsonify({"status": "failure", "reason": "Invalid data"}), 400 # Обработчик для переключения уровня логирования Flask @app.route('/debug/flask', methods=['POST']) def toggle_flask_debug(): try: data = request.get_json() level = data.get('level', 'DEBUG').upper() if level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']: return jsonify({'status': 'error', 'message': 'Invalid log level'}), 400 log_level = getattr(logging, level, logging.DEBUG) app.logger.setLevel(log_level) # Обновление уровня логирования для каждого обработчика for handler in app.logger.handlers: handler.setLevel(log_level) return jsonify({'status': 'success', 'level': level}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 # Обработчик для переключения уровня логирования Telebot @app.route('/debug/telebot', methods=['POST']) def toggle_telebot_debug(): try: data = request.get_json() level = data.get('level', 'DEBUG').upper() if level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']: return jsonify({'status': 'error', 'message': 'Invalid log level'}), 400 log_level = getattr(logging, level, logging.DEBUG) telebot.logger.setLevel(log_level) # Обновление уровня логирования для каждого обработчика for handler in telebot.logger.handlers: handler.setLevel(log_level) return jsonify({'status': 'success', 'level': level}) except Exception as e: return jsonify({'status': 'error', 'message': str(e)}), 500 # Handle active triggers def handle_active_triggers(message): chat_id = message.chat.id regions = get_user_subscribed_regions(chat_id) regions_per_page = 3 start_index = 0 markup = create_region_markup(regions, start_index, regions_per_page) bot.send_message(chat_id, "По какому региону хотите получить активные проблемы:", reply_markup=markup) def create_region_markup(regions, start_index, regions_per_page): markup = telebot.types.InlineKeyboardMarkup() end_index = min(start_index + regions_per_page, len(regions)) buttons = [] for i in range(start_index, end_index): region_id, region_name = regions[i] buttons.append(telebot.types.InlineKeyboardButton(text=region_id, callback_data=f"region_{region_id}")) prev_button = telebot.types.InlineKeyboardButton(text="<", callback_data=f"prev_{start_index}") if start_index > 0 else None next_button = telebot.types.InlineKeyboardButton(text=">", callback_data=f"next_{start_index}") if end_index < len(regions) else None row_buttons = [prev_button] + buttons + [next_button] row_buttons = [btn for btn in row_buttons if btn is not None] # Remove None values markup.row(*row_buttons) return markup @bot.callback_query_handler(func=lambda call: call.data.startswith("region_")) def handle_region_selection(call): region_id = call.data.split("_")[1] chat_id = call.message.chat.id try: # Получение всех групп хостов, содержащих region_id в названии zapi = ZabbixAPI(ZABBIX_URL) zapi.login(api_token=ZABBIX_API_TOKEN) host_groups = zapi.hostgroup.get( output=["groupid", "name"], search={"name": region_id} ) # Фильтрация групп хостов, исключая те, в названии которых есть "test" filtered_groups = [group for group in host_groups if 'test' not in group['name'].lower()] if not filtered_groups: bot.send_message(chat_id, "Нет групп хостов, соответствующих данному региону.") bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None) bot.answer_callback_query(call.id) return # Отправка списка групп хостов пользователю в виде кнопок markup = telebot.types.InlineKeyboardMarkup() for group in filtered_groups: markup.add(telebot.types.InlineKeyboardButton(text=group['name'], callback_data=f"group_{group['groupid']}")) bot.send_message(chat_id, f"Найдены следующие группы хостов для региона {region_id}:", reply_markup=markup) except Exception as e: telebot.logger.error(f"Error connecting to Zabbix API: {e}") bot.send_message(chat_id, "Не удалось подключиться к Zabbix API. Пожалуйста, попробуйте позже.") bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None) bot.answer_callback_query(call.id) @bot.callback_query_handler(func=lambda call: call.data.startswith("group_")) def handle_group_selection(call): group_id = call.data.split("_")[1] chat_id = call.message.chat.id try: # Получение триггеров для выбранной группы хостов triggers = get_zabbix_triggers(group_id) if triggers is None: bot.send_message(chat_id, "Не удалось подключиться к Zabbix API. Пожалуйста, попробуйте позже.") elif not triggers: bot.send_message(chat_id, "Нет активных проблем по указанной группе уровня HIGH и DISASTER за последние 24 часа.") else: for trigger in triggers: bot.send_message(chat_id, trigger, parse_mode="html") time.sleep(1/5) except Exception as e: telebot.logger.error(f"Error processing group selection: {e}") bot.send_message(chat_id, "Произошла ошибка при обработке вашего запроса. Пожалуйста, попробуйте позже.") bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None) bot.answer_callback_query(call.id) @bot.callback_query_handler(func=lambda call: call.data.startswith("prev_") or call.data.startswith("next_")) def handle_pagination(call): direction, index = call.data.split("_") index = int(index) regions = get_user_subscribed_regions(call.message.chat.id) regions_per_page = 3 if direction == "prev": start_index = max(0, index - regions_per_page) else: start_index = min(len(regions) - regions_per_page, index + regions_per_page) markup = create_region_markup(regions, start_index, regions_per_page) bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=markup) bot.answer_callback_query(call.id) # Завершение обработки callback def get_zabbix_triggers(group_id): try: # Создайте подключение к вашему Zabbix серверу zapi = ZabbixAPI(ZABBIX_URL) zapi.login(api_token=ZABBIX_API_TOKEN) # Получение триггеров уровня "Высокая" и "Авария" за последние 24 часа для указанной группы хостов time_from = int(time.time()) - 24 * 3600 # последние 24 часа time_till = int(time.time()) triggers = zapi.trigger.get( output=["triggerid", "description", "priority"], selectHosts=["hostid", "name"], groupids=group_id, filter={"priority": ["4", "5"], "value": "1"}, only_true=1, active=1, withLastEventUnacknowledged=1, time_from=time_from, time_till=time_till, expandDescription=1, expandComment=1, selectItems=["itemid", "lastvalue"], selectLastEvent=["clock"] ) # Московское время moskva_tz = timezone('Europe/Moscow') priority_map = { '4': 'Высокая', '5': 'Авария' } trigger_messages = [] current_time = datetime.now(moskva_tz) one_day_ago = current_time - timedelta(days=1) for trigger in triggers: # Получаем время последнего события event_time_epoch = int(trigger['lastEvent']['clock']) event_time = datetime.fromtimestamp(event_time_epoch, tz=moskva_tz) # Проверяем, не старше ли событие чем на сутки if event_time < one_day_ago: continue description = trigger['description'] host = trigger['hosts'][0]['name'] priority = priority_map.get(trigger['priority'], 'Неизвестно') # Генерация ссылки на график item_ids = [item['itemid'] for item in trigger['items']] graph_links = [] for item_id in item_ids: graph_link = f'Ссылка на график' graph_links.append(graph_link) graph_links_str = "\n".join(graph_links) # Заменяем {HOST.NAME} на имя хоста description = description.replace("{HOST.NAME}", host) # Заменяем {ITEM.LASTVALUE1} и другие на соответствующие значения for i, item in enumerate(trigger['items']): lastvalue_placeholder = f"{{ITEM.LASTVALUE{i + 1}}}" if lastvalue_placeholder in description: description = description.replace(lastvalue_placeholder, item['lastvalue']) event_time_formatted = event_time.strftime('%Y-%m-%d %H:%M:%S Мск') message = (f"Host: {host}\n" f"Критичность: {priority}\n" f"Описание: {description}\n" f"Время создания: {event_time_formatted}\n" f"{graph_links_str}") trigger_messages.append(message) return trigger_messages except Exception as e: telebot.logger.error(f"Error connecting to Zabbix API: {e}") return None async def send_group_messages(chat_id, messages): for message in messages: await send_message(chat_id, message, is_notification=True) await asyncio.sleep(1) # Delay between messages to comply with rate limit # Test functions for admin def simulate_event(message): chat_id = message.chat.id test_event = { "host": "12", "msg": "Тестовое сообщение", "date_reception": "12757175125", "severity": "5", "tags": "OTC,OFS,NFS", "status": "Авария!" } app.logger.info(f"Simulating event: {test_event}") # Use requests to simulate a POST request response = requests.post('http://localhost:5000/webhook', json=test_event) app.logger.info(f"Response from webhook: {response.status_code} - {response.text}") bot.send_message(chat_id, f"Тестовое событие отправлено. Статус ответа: {response.status_code}") def simulate_triggers(message): chat_id = message.chat.id regions = ["12", "19", "35", "40"] trigger_messages = [] for region_id in regions: triggers = get_zabbix_triggers(region_id) if triggers: trigger_messages.append(f"Регион {region_id}:\n{triggers}") if trigger_messages: bot.send_message(chat_id, "\n\n".join(trigger_messages), parse_mode="html") else: bot.send_message(chat_id, "Нет активных проблем по указанным регионам за последние 24 часа.") def run_polling(): bot.polling(non_stop=True, interval=0) # Запуск Flask-приложения def run_flask(): app.run(port=5000, host='0.0.0.0', debug=True, use_reloader=False) def schedule_jobs(): schedule.every().day.at("00:00").do(archive_old_logs) while True: schedule.run_pending() time.sleep(60) # Проверять раз в минуту # Основная функция для запуска def main(): # Инициализация базы данных init_db() print('Bootstrap wait...') # Запуск Flask и бота в отдельных потоках Thread(target=run_flask, daemon=True).start() Thread(target=run_polling, daemon=True).start() # Запуск планировщика задач в отдельном потоке Thread(target=schedule_jobs, daemon=True).start() # Запуск асинхронных задач asyncio.run(consume_from_queue()) if __name__ == '__main__': main()