import os
from functools import partial
from gc import callbacks
from flask import Flask, request, jsonify, render_template
import schedule
from dotenv import load_dotenv
import hashlib
import telebot
from telebot import types
import logging
from logging.config import dictConfig
import zipfile
from threading import Thread, Lock
import sqlite3
import time
import asyncio
import aiohttp
import pika
import json
from concurrent.futures import ThreadPoolExecutor
from pyzabbix import ZabbixAPI
import requests
from pytz import timezone
from datetime import datetime, timedelta
import re
import urllib.parse
from log_manager import LogManager
from region_api import RegionAPI
from user_state_manager import UserStateManager
# Load environment variables
load_dotenv()
# Функция для загрузки значения из файла
def load_value_from_file(file_name):
try:
with open(file_name, 'r') as file:
return file.read().strip()
except FileNotFoundError:
return None
# Функция для получения переменной из окружения или файла
def get_variable_value(variable_name):
# Попытка получить значение из окружения
value = os.getenv(variable_name)
# Если переменная окружения не установлена, попробуем загрузить из файла
if not value:
file_value = "file_" + variable_name
value = os.getenv(file_value)
with open(value, 'r') as file:
value = file.read()
return value
return value
#
DEV = get_variable_value('DEV')
# Загрузка переменных окружения или значений из файлов
TOKEN = get_variable_value('TELEGRAM_TOKEN')
ZABBIX_API_TOKEN = get_variable_value('ZABBIX_API_TOKEN')
ZABBIX_URL = get_variable_value('ZABBIX_URL')
DB_PATH = 'db/telezab.db'
SUPPORT_EMAIL = "shiftsupport-rtmis@rtmis.ru"
BASE_URL = '/telezab'
# Инициализируем класс RegionApi
region_api = RegionAPI(DB_PATH)
# Инициализируем класс UserStateManager
user_state_manager = UserStateManager()
# Initialize Flask application
app = Flask(__name__, template_folder='templates')
# Инициализация LogManager
log_manager = LogManager(log_dir='logs', retention_days=30)
# Настройка уровня логирования для Flask
app.logger.setLevel(logging.INFO)
# Настройка pyTelegramBotAPI logger
telebot.logger = logging.getLogger('telebot')
# Важно: вызов schedule_log_rotation для планировки ротации и архивации логов
log_manager.schedule_log_rotation()
bot = telebot.TeleBot(TOKEN)
# Lock for database operations
db_lock = Lock()
# Semaphore for rate limiting
rate_limit_semaphore = asyncio.Semaphore(25) # 25 messages per second
def init_db():
try:
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create events table
cursor.execute('''CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT UNIQUE,
data TEXT,
delivered BOOLEAN)''')
# Create subscriptions table with username and active flag
cursor.execute('''CREATE TABLE IF NOT EXISTS subscriptions (
chat_id INTEGER,
region_id TEXT,
username TEXT,
active BOOLEAN DEFAULT TRUE,
skip BOOLEAN DEFAULT FALSE,
disaster_only BOOLEAN DEFAULT FALSE,
UNIQUE(chat_id, region_id))''')
# Create whitelist table
cursor.execute('''CREATE TABLE IF NOT EXISTS whitelist (
chat_id INTEGER PRIMARY KEY,
username TEXT,
user_email TEXT)''')
# Create whitelist table
cursor.execute('''CREATE TABLE IF NOT EXISTS admins (
chat_id INTEGER PRIMARY KEY,
username TEXT)''')
# Create regions table with active flag
cursor.execute('''CREATE TABLE IF NOT EXISTS regions (
region_id TEXT PRIMARY KEY,
region_name TEXT,
active BOOLEAN DEFAULT TRUE)''')
# Create user events table for logging
cursor.execute('''CREATE TABLE IF NOT EXISTS user_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER,
username TEXT,
action TEXT,
timestamp TEXT)''')
# Insert sample regions
cursor.execute('''INSERT OR IGNORE INTO regions (region_id, region_name) VALUES
('01', 'Адыгея'),
('02', 'Башкортостан (Уфа)'),
('04', 'Алтай'),
('19', 'Республика Хакасия')''')
conn.commit()
app.logger.info("Database initialized successfully.")
except Exception as e:
app.logger.error(f"Error initializing database: {e}")
finally:
conn.close()
# Hash the incoming data
def hash_data(data):
return hashlib.sha256(str(data).encode('utf-8')).hexdigest()
# Check if user is in whitelist
def is_whitelisted(chat_id):
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = 'SELECT COUNT(*) FROM whitelist WHERE chat_id = ?'
telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}")
cursor.execute(query, (chat_id,))
count = cursor.fetchone()[0]
conn.close()
return count > 0
# Add user to whitelist
def add_to_whitelist(chat_id, username):
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = 'INSERT OR IGNORE INTO whitelist (chat_id, username) VALUES (?, ?)'
telebot.logger.info(f"Executing query: {query} with chat_id={chat_id}, username={username}")
try:
cursor.execute(query, (chat_id, username))
conn.commit()
except Exception as e:
telebot.logger.error(f"Error during add to whitelist: {e}")
finally:
conn.close()
def rundeck_add_to_whitelist(chat_id, username, user_email):
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Проверка существования chat_id
check_query = 'SELECT COUNT(*) FROM whitelist WHERE chat_id = ?'
cursor.execute(check_query, (chat_id,))
count = cursor.fetchone()[0]
if count > 0:
conn.close()
return False # Пользователь уже существует
# Вставка нового пользователя
insert_query = 'INSERT INTO whitelist (chat_id, username, user_email) VALUES (?, ?, ?)'
telebot.logger.info(f"Rundeck executing query: {insert_query} with chat_id={chat_id}, username={username}, email={user_email}")
cursor.execute(insert_query, (chat_id, username, user_email))
conn.commit()
conn.close()
return True # Успешное добавление
# Remove user from whitelist
def remove_from_whitelist(chat_id):
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = 'DELETE FROM whitelist WHERE chat_id = ?'
telebot.logger.info(f"Executing query: {query} with chat_id={chat_id}")
cursor.execute(query, (chat_id,))
conn.commit()
conn.close()
def get_admins():
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT chat_id FROM admins')
admins = cursor.fetchall()
admins = [i[0] for i in admins]
conn.close()
return admins
# # Get list of regions
# def get_regions():
# with db_lock:
# conn = sqlite3.connect(DB_PATH)
# cursor = conn.cursor()
# cursor.execute('SELECT region_id, region_name FROM regions WHERE active = TRUE ORDER BY region_id')
# regions = cursor.fetchall()
# conn.close()
# return regions
def get_sorted_regions():
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT region_id, region_name FROM regions WHERE active = TRUE')
regions = cursor.fetchall()
conn.close()
# Сортируем регионы по числовому значению region_id
regions.sort(key=lambda x: int(x[0]))
return regions
# Check if region exists
def region_exists(region_id):
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM regions WHERE region_id = ? AND active = TRUE', (region_id,))
count = cursor.fetchone()[0]
conn.close()
return count > 0
# Get list of regions a user is subscribed to
def get_user_subscribed_regions(chat_id):
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT regions.region_id, regions.region_name
FROM subscriptions
JOIN regions ON subscriptions.region_id = regions.region_id
WHERE subscriptions.chat_id = ? AND subscriptions.active = TRUE AND subscriptions.skip = FALSE
ORDER BY regions.region_id
''', (chat_id,))
regions = cursor.fetchall()
conn.close()
return regions
# Check if user is subscribed to a region
def is_subscribed(chat_id, region_id):
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT COUNT(*)
FROM subscriptions
WHERE chat_id = ? AND region_id = ? AND active = TRUE AND skip = FALSE
''', (chat_id, region_id))
count = cursor.fetchone()[0]
conn.close()
return count > 0
# Format regions list
def format_regions_list(regions):
return '\n'.join([f"{region_id} - {region_name}" for region_id, region_name in regions])
def log_user_event(chat_id, username, action):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
try:
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = 'INSERT INTO user_events (chat_id, username, action, timestamp) VALUES (?, ?, ?, ?)'
telebot.logger.debug(f"Executing query: {query} with chat_id={chat_id}, username={username}, action={action}, timestamp={timestamp}")
cursor.execute(query, (chat_id, username, action, timestamp))
conn.commit()
telebot.logger.info(f"User event logged: {chat_id} ({username}) - {action} at {timestamp}.")
except Exception as e:
telebot.logger.error(f"Error logging user event: {e}")
finally:
conn.close()
# Define states
NOTIFICATION_MODE = 1
SETTINGS_MODE = 2
# Handle /help command to provide instructions
@bot.message_handler(commands=['help'])
def handle_help(message):
chat_id = message.chat.id
if not is_whitelisted(chat_id):
bot.send_message(chat_id, "Вы неавторизованы для использования этого бота.")
return
help_text = (
'/start - Показать меню бота\n'
'Настройки - Перейти в режим настроек и управления подписками\n'
'Активные события - Получение всех нерешённых событий мониторинга по выбранным сервисам в выбранного региона\n'
'Помощь - Описание всех возможностей бота'
)
bot.send_message(message.chat.id, help_text, parse_mode="html")
show_main_menu(message.chat.id)
# Handle /register command for new user registration
def handle_register(message):
chat_id = message.chat.id
username = message.from_user.username
if username:
username = f"@{username}"
else:
username = "N/A"
text = (f'Для продолжения регистрации необходимо отправить с корпоративного почтового адреса "РТ МИС" письмо на адрес {SUPPORT_EMAIL}\n'
f'В теме письма указать "Подтверждение регистрации в телеграм-боте TeleZab".\n'
f'В теле письма указать:\n'
f'1. ФИО\n'
f'2. Ваш Chat ID: {chat_id}\n'
f'3. Ваше имя пользователя: {username}')
bot.send_message(chat_id,text,parse_mode="HTML")
log_user_event(chat_id, username, "Requested registration")
# Handle /start command
@bot.message_handler(commands=['start'])
def handle_start(message):
show_main_menu(message.chat.id)
def show_main_menu(chat_id):
markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True)
if is_whitelisted(chat_id):
user_state_manager.set_state(chat_id, "MAIN_MENU")
markup.add('Настройки', 'Помощь', 'Активные события')
else:
user_state_manager.set_state(chat_id, "REGISTRATION")
markup.add('Регистрация')
bot.send_message(chat_id, "Выберите действие:", reply_markup=markup)
def create_settings_keyboard(chat_id, admins_list):
markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True)
# Линия 1: "Подписаться", "Отписаться"
markup.row('Подписаться','Отписаться')
markup.row('Мои подписки')
markup.row('Режим уведомлений')
if DEV == '1':
if chat_id in admins_list:
markup.row('Активные регионы')
markup.row('Добавить регион', 'Удалить регион')
markup.row('Назад')
return markup
# Settings menu for users
def show_settings_menu(chat_id):
if not is_whitelisted(chat_id):
user_state_manager.set_state(chat_id, "REGISTRATION")
bot.send_message(chat_id, "Вы неавторизованы для использования этого бота")
return
admins_list = get_admins()
markup = create_settings_keyboard(chat_id, admins_list)
bot.send_message(chat_id, "Вы находитесь в режиме настроек. Выберите действие:", reply_markup=markup)
# Основной обработчик меню
@bot.message_handler(func=lambda message: True)
def handle_menu_selection(message):
chat_id = message.chat.id
text = message.text.strip()
username = message.from_user.username
# Проверка авторизации
if not is_whitelisted(chat_id) and text != 'Регистрация':
bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.")
return
# Получаем текущее состояние пользователя
current_state = user_state_manager.get_state(chat_id)
# Обработка команд в зависимости от состояния
if current_state == "MAIN_MENU":
handle_main_menu(message, chat_id, text)
elif current_state == "REGISTRATION":
handle_register(message)
elif current_state == "SETTINGS_MENU":
handle_settings_menu(message, chat_id, text)
elif current_state == "SUBSCRIBE":
process_subscription_button(message, chat_id, username)
elif current_state == "UNSUBSCRIBE":
process_unsubscription_button(message, chat_id, username)
# elif current_state == "ADD_REGION":
# process_add_region_button(message, chat_id, username)
# elif current_state == "REMOVE_REGION":
# process_remove_region_button(message, chat_id, username)
else:
bot.send_message(chat_id, "Команда не распознана.")
show_main_menu(chat_id)
def handle_main_menu(message, chat_id, text):
"""Обработка команд в главном меню."""
if text == 'Регистрация':
user_state_manager.set_state(chat_id, "REGISTRATION")
handle_register(message)
elif text == 'Настройки':
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
elif text == 'Помощь':
handle_help(message)
elif text == 'Активные события':
handle_active_triggers(message)
else:
bot.send_message(chat_id, "Команда не распознана.")
show_main_menu(chat_id)
def handle_settings_menu(message, chat_id, text):
"""Обработка команд в меню настроек."""
admins_list = get_admins()
if text.lower() == 'подписаться':
user_state_manager.set_state(chat_id, "SUBSCRIBE")
handle_subscribe_button(message)
elif text.lower() == 'отписаться':
user_state_manager.set_state(chat_id, "UNSUBSCRIBE")
handle_unsubscribe_button(message)
elif text.lower() == 'мои подписки':
handle_my_subscriptions_button(message)
elif text.lower() == 'активные регионы':
handle_active_regions_button(message)
elif text.lower() == "режим уведомлений":
handle_notification_mode_button(message)
elif text.lower() == 'добавить регион' and chat_id in admins_list:
user_state_manager.set_state(chat_id, "ADD_REGION")
handle_region_manager(chat_id, 'add')
elif text.lower() == 'удалить регион' and chat_id in admins_list:
user_state_manager.set_state(chat_id, "REMOVE_REGION")
handle_region_manager(chat_id, 'remove')
elif text.lower() == 'назад':
user_state_manager.set_state(chat_id, "MAIN_MENU")
show_main_menu(chat_id)
else:
bot.send_message(chat_id, "Команда не распознана.")
show_settings_menu(chat_id)
def handle_subscribe_button(message):
chat_id = message.chat.id
if not is_whitelisted(chat_id):
bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.")
return
username = message.from_user.username
if username:
username = f"@{username}"
else:
username = "N/A"
regions_list = format_regions_list(get_sorted_regions())
markup = telebot.types.InlineKeyboardMarkup()
markup.add(telebot.types.InlineKeyboardButton(text="Отмена",
callback_data=f"cancel_action"))
bot.send_message(chat_id, f"Отправьте номера регионов через запятую:\n{regions_list}\n",reply_markup=markup)
# Сохраняем ID сообщения с клавиатурой для последующего редактирования
# user_state_manager.set_state(chat_id, "WAITING_FOR_INPUT",
# extra_data={"cancel_message_id": sent_message.message_id})
bot.register_next_step_handler_by_chat_id(chat_id, process_subscription_button, chat_id, username)
def process_subscription_button(message, chat_id, username):
subbed_regions = []
invalid_regions = []
if message.text.lower() == 'отмена':
bot.send_message(chat_id, "Действие отменено.")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
return show_settings_menu(chat_id)
if not all(part.strip().isdigit() for part in message.text.split(',')):
markup = telebot.types.InlineKeyboardMarkup()
markup.add(telebot.types.InlineKeyboardButton(text="Отмена",
callback_data=f"cancel_action"))
bot.send_message(chat_id, "Неверный формат данных. Введите номер или номера регионов через запятую.", reply_markup=markup)
# # Сохраняем ID сообщения с клавиатурой для последующего редактирования
# user_state_manager.set_state(chat_id, "WAITING_FOR_INPUT", extra_data={"cancel_message_id": sent_message.message_id})
bot.register_next_step_handler_by_chat_id(chat_id, process_subscription_button, chat_id, username)
return
region_ids = message.text.split(',')
valid_region_ids = [region[0] for region in get_sorted_regions()]
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
for region_id in region_ids:
region_id = region_id.strip()
if region_id not in valid_region_ids:
invalid_regions.append(region_id)
continue
cursor.execute('INSERT OR IGNORE INTO subscriptions (chat_id, region_id, username, active) VALUES (?, ?, ?, TRUE)',
(chat_id, region_id, username))
if cursor.rowcount == 0:
cursor.execute('UPDATE subscriptions SET active = TRUE WHERE chat_id = ? AND region_id = ?', (chat_id, region_id))
subbed_regions.append(region_id)
conn.commit()
# # Получаем ID сообщения с клавиатурой "Отмена" и скрываем её
# state_data = user_state_manager.get_state(chat_id)
# cancel_message_id = state_data.get("cancel_message_id")
#
# if cancel_message_id:
# try:
# bot.edit_message_reply_markup(chat_id, cancel_message_id, reply_markup=None)
# except telebot.apihelper.ApiTelegramException as e:
# telebot.logger.error(f"Failed to edit message: {e}")
if len(invalid_regions) > 0:
bot.send_message(chat_id, f"Регион с ID {', '.join(invalid_regions)} не существует. Введите корректные номера или 'отмена'.")
bot.send_message(chat_id, f"Подписка на регионы: {', '.join(subbed_regions)} оформлена.")
log_user_event(chat_id, username, f"Subscribed to regions: {', '.join(subbed_regions)}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
def handle_unsubscribe_button(message):
chat_id = message.chat.id
if not is_whitelisted(chat_id):
bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.")
telebot.logger.info(f"Unauthorized access attempt by {chat_id}")
user_state_manager.set_state(chat_id, "REGISTRATION")
return show_main_menu(chat_id)
username = message.from_user.username
if username:
username = f"@{username}"
else:
username = "N/A"
# Получаем список подписок пользователя
user_regions = get_user_subscribed_regions(chat_id)
if not user_regions:
bot.send_message(chat_id, "Вы не подписаны ни на один регион.")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
return show_settings_menu(chat_id)
regions_list = format_regions_list(user_regions)
markup = telebot.types.InlineKeyboardMarkup()
markup.add(telebot.types.InlineKeyboardButton(text="Отмена",
callback_data=f"cancel_action"))
bot.send_message(chat_id, f"Отправьте номер или номера регионов, от которых хотите отписаться (через запятую):\n{regions_list}\n",reply_markup=markup)
# user_state_manager.set_state(chat_id, "WAITING_FOR_INPUT",
# extra_data={"cancel_message_id": sent_message.message_id})
bot.register_next_step_handler_by_chat_id(chat_id, process_unsubscription_button, chat_id, username)
def process_unsubscription_button(message, chat_id, username):
unsubbed_regions = []
invalid_regions = []
markup = telebot.types.InlineKeyboardMarkup()
markup.add(telebot.types.InlineKeyboardButton(text="Отмена",
callback_data=f"cancel_action"))
if message.text.lower() == 'отмена':
bot.send_message(chat_id, "Действие отменено.")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
return show_settings_menu(chat_id)
# Проверка, что введённая строка содержит только цифры и запятые
if not all(part.strip().isdigit() for part in message.text.split(',')):
bot.send_message(chat_id, "Некорректный формат. Введите номера регионов через запятую.", reply_markup=markup)
# user_state_manager.set_state(chat_id, "WAITING_FOR_INPUT",
# extra_data={"cancel_message_id": sent_message.message_id})
bot.register_next_step_handler_by_chat_id(chat_id, process_unsubscription_button, chat_id, username)
return
region_ids = message.text.split(',')
valid_region_ids = [region[0] for region in get_user_subscribed_regions(chat_id)]
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
for region_id in region_ids:
region_id = region_id.strip()
if region_id not in valid_region_ids:
invalid_regions.append(region_id)
continue
# Удаление подписки
query = 'UPDATE subscriptions SET active = FALSE WHERE chat_id = ? AND region_id = ?'
cursor.execute(query, (chat_id, region_id))
unsubbed_regions.append(region_id)
conn.commit()
# # Получаем ID сообщения с клавиатурой "Отмена" и скрываем её
# state_data = user_state_manager.get_state(chat_id)
# cancel_message_id = state_data.get_extra_data(chat_id)
# if cancel_message_id:
# try:
# bot.edit_message_reply_markup(chat_id, cancel_message_id, reply_markup=None)
# except telebot.apihelper.ApiTelegramException as e:
# telebot.logger.error(f"Failed to edit message: {e}")
if len(invalid_regions) > 0:
bot.send_message(chat_id, f"Регион с ID {', '.join(invalid_regions)} не найден в ваших подписках.")
bot.send_message(chat_id, f"Отписка от регионов: {', '.join(unsubbed_regions)} выполнена.")
log_user_event(chat_id, username, f"Unsubscribed from regions: {', '.join(unsubbed_regions)}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
@bot.callback_query_handler(func=lambda call: call.data == "cancel_action")
def handle_cancel_action(call):
chat_id = call.message.chat.id
message_id = call.message.message_id
bot.clear_step_handler_by_chat_id(chat_id)
bot.send_message(chat_id,f"Действие отменено")
bot.edit_message_reply_markup(chat_id,message_id,reply_markup=None)
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
return
@bot.callback_query_handler(func=lambda call: call.data == "cancel_active_triggers")
def handle_cancel_active_triggers(call):
chat_id = call.message.chat.id
message_id = call.message.message_id
bot.clear_step_handler_by_chat_id(chat_id)
bot.send_message(chat_id,f"Действие отменено")
bot.edit_message_reply_markup(chat_id,message_id,reply_markup=None)
user_state_manager.set_state(chat_id, "MAIN_MENU")
show_main_menu(chat_id)
return
######################################################################################################################
## help_Region_Manager
##
######################################################################################################################
# Handle admin region management commands
def handle_region_manager(chat_id: int, action: str):
if action == 'add':
bot.send_message(chat_id, "Введите ID и название региона в формате:\n ")
bot.register_next_step_handler_by_chat_id(chat_id, process_add_region)
elif action == 'remove':
bot.send_message(chat_id, "Введите ID региона, который хотите сделать неактивным")
bot.register_next_step_handler_by_chat_id(chat_id, process_remove_region)
######################################################################################################################
## help_Region_Manager
##
######################################################################################################################
class RegionManager:
def __init__(self, db_path):
self.db_path = db_path
def add_region(self, region_id: int, region_name: str):
with db_lock, sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Проверяем наличие региона
cursor.execute('SELECT region_name, active FROM regions WHERE region_id = ?', (region_id,))
result = cursor.fetchone()
if result:
existing_region_name, active = result
if existing_region_name == region_name:
# Регион уже существует с этим именем
cursor.execute('UPDATE regions SET active = TRUE WHERE region_id = ?', (region_id,))
conn.commit()
return "activated", existing_region_name
else:
return "exists", existing_region_name
else:
# Добавляем новый регион
cursor.execute('INSERT OR IGNORE INTO regions (region_id, region_name) VALUES (?, ?)', (region_id, region_name))
conn.commit()
return "added", region_name
def remove_region(self, region_id: int):
with db_lock, sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Проверяем, существует ли регион
cursor.execute('SELECT COUNT(*) FROM regions WHERE region_id = ?', (region_id,))
count = cursor.fetchone()[0]
if count == 0:
return False # Регион не найден
# Деактивируем регион и обновляем подписки
cursor.execute('UPDATE regions SET active = FALSE WHERE region_id = ?', (region_id,))
cursor.execute('UPDATE subscriptions SET active = FALSE WHERE region_id = ? AND active = TRUE', (region_id,))
conn.commit()
return True
def log_event(self, chat_id: int, username: str, action: str):
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
with db_lock, sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
query = 'INSERT INTO user_events (chat_id, username, action, timestamp) VALUES (?, ?, ?, ?)'
cursor.execute(query, (chat_id, username, action, timestamp))
conn.commit()
region_manager = RegionManager(DB_PATH)
def process_add_region(message):
chat_id = message.chat.id
username = f"@{message.from_user.username}" if message.from_user.username else "N/A"
try:
parts = message.text.split()
if len(parts) < 2:
raise ValueError("Неверный формат")
region_id, region_name = parts[0], ' '.join(parts[1:])
status, existing_region_name = region_manager.add_region(region_id, region_name)
if status == "activated":
bot.send_message(chat_id, f"Регион {region_id} - {region_name} активирован.")
region_manager.log_event(chat_id, username, f"Admin reactivated region {region_id} - {region_name}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
elif status == "added":
bot.send_message(chat_id, f"Регион {region_id} - {region_name} добавлен.")
region_manager.log_event(chat_id, username, f"Admin added region {region_id} - {region_name}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
elif status == "exists":
markup = telebot.types.InlineKeyboardMarkup()
markup.add(telebot.types.InlineKeyboardButton(text="Заменить",
callback_data=f"replace_{region_id}_{urllib.parse.quote(region_name)}"))
markup.add(
telebot.types.InlineKeyboardButton(text="Активировать старый", callback_data=f"reactivate_{region_id}"))
markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_region_{chat_id}"))
bot.send_message(chat_id,
f"Регион {region_id} уже существует с названием {existing_region_name}. Хотите заменить его или активировать старый регион?",
reply_markup=markup)
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
except (IndexError, ValueError):
bot.send_message(chat_id, "Неверный формат. Используйте: ")
except Exception as e:
telebot.logger.error(f"Unexpected error: {e}")
bot.send_message(chat_id, "Произошла ошибка при обработке запроса.")
@bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith(
"reactivate_") or call.data.startswith("cancel_region_"))
def handle_region_action(call):
parts = call.data.split("_", 2)
action = parts[0]
region_id = parts[1]
region_name = parts[2] if len(parts) > 2 else None
chat_id = call.message.chat.id
username = f"@{call.message.from_user.username}" if call.message.from_user.username else "N/A"
if action == "replace":
if region_name:
region_name = urllib.parse.unquote(region_name)
region_manager.add_region(region_id, region_name)
bot.send_message(chat_id, f"Регион {region_id} обновлен до {region_name} и активирован.")
region_manager.log_event(chat_id, username,
f"Admin replaced and reactivated region {region_id} - {region_name}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
elif action == "reactivate":
region_manager.add_region(region_id, region_name)
bot.send_message(chat_id, f"Регион {region_id} активирован.")
region_manager.log_event(chat_id, username, f"Admin reactivated region {region_id} - {region_name}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
elif action == "cancel_region":
bot.send_message(chat_id, "Действие отменено.")
telebot.logger.info(f"Admin {username} canceled region action.")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
bot.edit_message_reply_markup(chat_id=chat_id, message_id=call.message.message_id, reply_markup=None)
bot.answer_callback_query(call.id)
return show_settings_menu(chat_id)
def process_remove_region(message):
chat_id = message.chat.id
username = f"@{message.from_user.username}" if message.from_user.username else "N/A"
try:
region_id = message.text.split()[0]
success = region_manager.remove_region(region_id)
if success:
bot.send_message(chat_id, f"Регион {region_id} теперь неактивен, и все активные подписки обновлены.")
region_manager.log_event(chat_id, username, f"Admin {username} deactivated region {region_id}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
else:
bot.send_message(chat_id, f"Регион с ID {region_id} не существует.")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
except IndexError:
bot.send_message(chat_id, "Неверный формат. Используйте: ")
######################################################################################################################
##
##
######################################################################################################################
# Handle displaying active subscriptions for a user
def handle_my_subscriptions_button(message):
chat_id = message.chat.id
if not is_whitelisted(chat_id):
bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.")
telebot.logger.info(f"Unauthorized access attempt by {chat_id}")
return
user_regions = get_user_subscribed_regions(chat_id)
if not user_regions:
bot.send_message(chat_id, "Вы не подписаны ни на один регион.")
else:
user_regions.sort(key=lambda x: int(x[0])) # Сортировка по числовому значению region_id
regions_list = format_regions_list(user_regions)
bot.send_message(chat_id, f"Ваши активные подписки:\n{regions_list}")
show_settings_menu(chat_id)
# Handle displaying all active regions
def handle_active_regions_button(message):
chat_id = message.chat.id
if not is_whitelisted(chat_id):
bot.send_message(chat_id, "Вы не авторизованы для использования этого бота.")
telebot.logger.info(f"Unauthorized access attempt by {chat_id}")
return
regions = get_sorted_regions() # Используем функцию для получения отсортированных регионов
if not regions:
bot.send_message(chat_id, "Нет активных регионов.")
else:
regions_list = format_regions_list(regions)
bot.send_message(chat_id, f"Активные регионы:\n{regions_list}")
show_settings_menu(chat_id)
# RabbitMQ configuration
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
RABBITMQ_QUEUE = 'telegram_notifications'
def rabbitmq_connection():
# Создаем объект учетных данных
credentials = pika.PlainCredentials('admin', 'admin')
# Указываем параметры подключения, включая учетные данные
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
credentials=credentials, # Передаем учетные данные
heartbeat=600, # Интервал heartbeat для поддержания соединения
blocked_connection_timeout=300 # Таймаут блокировки соединения
)
# Создаем подключение и канал
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
return connection, channel
def send_to_queue(message):
connection, channel = rabbitmq_connection()
channel.basic_publish(
exchange='',
routing_key=RABBITMQ_QUEUE,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
connection.close()
async def consume_from_queue():
connection, channel = rabbitmq_connection()
for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE):
message = json.loads(body)
chat_id = message['chat_id']
username = message['username']
message_text = message['message']
try:
await send_notification_message(chat_id, message_text, username)
channel.basic_ack(method_frame.delivery_tag)
except Exception as e:
telebot.logger.error(f"Error sending message from queue: {e}")
# Optionally, you can nack the message to requeue it
# channel.basic_nack(method_frame.delivery_tag)
connection.close()
async def send_message(chat_id, message, is_notification=False):
try:
if is_notification:
await rate_limit_semaphore.acquire()
parse_mode = 'HTML'
# Используем partial для передачи именованных аргументов в bot.send_message
func_with_args = partial(bot.send_message, chat_id=chat_id, text=message, parse_mode=parse_mode)
# Передаем подготовленную функцию в run_in_executor
await run_in_executor(func_with_args)
except telebot.apihelper.ApiTelegramException as e:
if "429" in str(e):
telebot.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...")
await asyncio.sleep(1)
await send_message(chat_id, message, is_notification)
else:
telebot.logger.error(f"Failed to send message to {chat_id}: {e}")
telebot.logger.error(f"Detailed Error: {e}", exc_info=True) # Добавлено логирование исключения
except Exception as e:
username = f"@{message.from_user.username}" if message.from_user.username else "N/A"
telebot.logger.error(f"Unexpected error while sending message to {username} {chat_id}: {e}", exc_info=True)
await check_telegram_api()
finally:
if is_notification:
rate_limit_semaphore.release()
async def send_notification_message(chat_id, message, username):
await send_message(chat_id, message, is_notification=True)
formatted_message = message.replace('\n', ' ').replace('\r', '')
telebot.logger.info(f'Send notification to {username} {chat_id} from RabbitMQ [{formatted_message}]')
async def run_in_executor(func, *args):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
return await loop.run_in_executor(pool, func, *args)
async def check_telegram_api():
try:
async with aiohttp.ClientSession() as session:
async with session.get('https://api.telegram.org') as response:
if response.status == 200:
telebot.logger.info("Telegram API is reachable.")
else:
telebot.logger.error("Telegram API is not reachable.")
except Exception as e:
telebot.logger.error(f"Error checking Telegram API: {e}")
def extract_region_number(host):
# Используем регулярное выражение для извлечения цифр после первого символа и до первой буквы
match = re.match(r'^.\d+', host)
if match:
return match.group(0)[1:] # Возвращаем строку без первого символа
return None
def handle_notification_mode_button(message):
chat_id = message.chat.id
if not is_whitelisted(chat_id):
bot.send_message(chat_id, "Вы неавторизованы для использования этого бота")
return
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton(text="Критические события", callback_data="notification_mode_disaster"))
markup.add(types.InlineKeyboardButton(text="Все события", callback_data="notification_mode_all"))
bot.send_message(chat_id, "Выберите уровень событий мониторинга, уведомление о которых хотите получать:\n"
'1. Критические события (приоритет "DISASTER") - события, являющиеся потенциальными авариями и требующие оперативного решения.\nВ Zabbix обязательно имеют тег "CALL" для оперативного привлечения инженеров к устранению.\n\n'
'2. Все события (По умолчанию) - критические события, а также события Zabbix высокого ("HIGH") приоритета, имеющие потенциально значительное влияние на сервис и требующее устранение в плановом порядке.',
reply_markup=markup,parse_mode="HTML")
@bot.callback_query_handler(func=lambda call: call.data.startswith("notification_mode_"))
def handle_notification_mode_selection(call):
chat_id = call.message.chat.id
message_id = call.message.message_id
mode = call.data.split("_")[2]
# Убираем клавиатуру
bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=None)
# Обновляем режим уведомлений
disaster_only = True if mode == "disaster" else False
try:
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
query = 'UPDATE subscriptions SET disaster_only = ? WHERE chat_id = ?'
cursor.execute(query, (disaster_only, chat_id))
conn.commit()
mode_text = "Критические события" if disaster_only else "Все события"
bot.send_message(chat_id, f"Режим уведомлений успешно изменён на: {mode_text}")
user_state_manager.set_state(chat_id, "SETTINGS_MENU")
show_settings_menu(chat_id)
except Exception as e:
telebot.logger.error(f"Error updating notification mode for {chat_id}: {e}")
bot.send_message(chat_id, "Произошла ошибка при изменении режима уведомлений.")
finally:
conn.close()
bot.answer_callback_query(call.id)
# Фаза 1: Запрос активных событий и выбор региона с постраничным переключением
def handle_active_triggers(message):
chat_id = message.chat.id
regions = get_sorted_regions() # Используем функцию get_regions для получения регионов
start_index = 0
markup = create_region_keyboard(regions, start_index)
bot.send_message(chat_id, "Выберите регион для получения активных событий:", reply_markup=markup)
def create_region_keyboard(regions, start_index, regions_per_page=10):
markup = types.InlineKeyboardMarkup()
end_index = min(start_index + regions_per_page, len(regions))
# Создаём кнопки для регионов
for i in range(start_index, end_index):
region_id, region_name = regions[i]
button = types.InlineKeyboardButton(text=f"{region_id}: {region_name}", callback_data=f"region_{region_id}")
markup.add(button)
# Добавляем кнопки для переключения страниц
navigation_row = []
if start_index > 0:
navigation_row.append(types.InlineKeyboardButton(text="<", callback_data=f"prev_{start_index}"))
if end_index < len(regions):
navigation_row.append(types.InlineKeyboardButton(text=">", callback_data=f"next_{end_index}"))
if navigation_row:
markup.row(*navigation_row)
markup.row(types.InlineKeyboardButton(text='Отмена', callback_data='cancel_active_triggers'))
return markup
@bot.callback_query_handler(
func=lambda call: call.data.startswith("region_") or call.data.startswith("prev_") or call.data.startswith(
"next_"))
def handle_region_pagination(call):
chat_id = call.message.chat.id
message_id = call.message.message_id
data = call.data
regions = get_sorted_regions() # Используем функцию get_regions для получения регионов
regions_per_page = 10
# Если был выбран регион, то убираем клавиатуру и продолжаем выполнение функции
if data.startswith("region_"):
region_id = data.split("_")[1]
bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=None)
handle_region_selection(call, region_id) # Продолжаем выполнение функции после выбора региона
# Если была нажата кнопка для переключения страниц
elif data.startswith("prev_") or data.startswith("next_"):
direction, index = data.split("_")
index = int(index)
# Рассчитываем новый индекс страницы
start_index = max(0, index - regions_per_page) if direction == "prev" else min(len(regions) - regions_per_page,
index)
# Обновляем клавиатуру для новой страницы
markup = create_region_keyboard(regions, start_index, regions_per_page)
bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=markup)
bot.answer_callback_query(call.id)
# Фаза 2: Обработка выбора региона и предложить выбор группы
def handle_region_selection(call, region_id):
chat_id = call.message.chat.id
try:
# Получаем группы хостов для выбранного региона
zapi = ZabbixAPI(ZABBIX_URL)
zapi.login(api_token=ZABBIX_API_TOKEN)
host_groups = zapi.hostgroup.get(output=["groupid", "name"], search={"name": region_id})
filtered_groups = [group for group in host_groups if 'test' not in group['name'].lower() and f'_{region_id}' in group['name']]
# Если нет групп
if not filtered_groups:
bot.send_message(chat_id, "Нет групп хостов для этого региона.")
show_main_menu(chat_id)
return
# Создаем клавиатуру с выбором группы или всех групп
markup = types.InlineKeyboardMarkup()
for group in filtered_groups:
markup.add(types.InlineKeyboardButton(text=group['name'], callback_data=f"group_{group['groupid']}"))
markup.add(types.InlineKeyboardButton(text="Все группы региона\n(Долгое выполнение)", callback_data=f"all_groups_{region_id}"))
bot.send_message(chat_id, "Выберите группу хостов или получите события по всем группам региона:", reply_markup=markup)
except Exception as e:
bot.send_message(chat_id, f"Ошибка при подключении к Zabbix API.\n{str(e)}")
show_main_menu(chat_id)
# Фаза 3: Обработка выбора группы/всех групп и запрос периода
@bot.callback_query_handler(func=lambda call: call.data.startswith("group_") or call.data.startswith("all_groups_"))
def handle_group_or_all_groups(call):
chat_id = call.message.chat.id
message_id = call.message.message_id
# Убираем клавиатуру после выбора группы
bot.edit_message_reply_markup(chat_id=chat_id, message_id=message_id, reply_markup=None)
# Если выбрана конкретная группа
if call.data.startswith("group_"):
group_id = call.data.split("_")[1]
get_triggers_for_group(chat_id, group_id) # Сразу получаем события для группы
show_main_menu(chat_id)
# Если выбраны все группы региона
elif call.data.startswith("all_groups_"):
region_id = call.data.split("_")[2]
get_triggers_for_all_groups(chat_id, region_id) # Сразу получаем события для всех групп региона
show_main_menu(chat_id)
# Вспомогательная функция: получение событий для группы
def get_triggers_for_group(chat_id, group_id):
triggers = get_zabbix_triggers(group_id) # Получаем все активные события без периода
if not triggers:
bot.send_message(chat_id, f"Нет активных событий.")
show_main_menu(chat_id)
else:
send_triggers_to_user(triggers, chat_id)
def get_triggers_for_all_groups(chat_id, region_id):
try:
zapi = ZabbixAPI(ZABBIX_URL)
zapi.login(api_token=ZABBIX_API_TOKEN)
host_groups = zapi.hostgroup.get(output=["groupid", "name"], search={"name": region_id})
filtered_groups = [group for group in host_groups if 'test' not in group['name'].lower()]
all_triggers = []
for group in filtered_groups:
triggers = get_zabbix_triggers(group['groupid'])
if triggers:
all_triggers.extend(triggers)
if all_triggers:
send_triggers_to_user(all_triggers, chat_id)
else:
bot.send_message(chat_id, f"Нет активных событий.")
show_main_menu(chat_id)
except Exception as e:
bot.send_message(chat_id, f"Ошибка при получении событий.\n{str(e)}")
show_main_menu(chat_id)
# Вспомогательная функция: отправка событий пользователю
def send_triggers_to_user(triggers, chat_id):
for trigger in triggers:
bot.send_message(chat_id, trigger, parse_mode="html")
time.sleep(1 / 5)
def escape_telegram_chars(text):
"""
Экранирует запрещённые символы для Telegram API:
< -> <
> -> >
& -> &
Также проверяет на наличие запрещённых HTML-тегов и другие проблемы с форматированием.
"""
replacements = {
'&': '&',
'<': '<',
'>': '>',
'"': '"', # Для кавычек
}
# Применяем замены
for char, replacement in replacements.items():
text = text.replace(char, replacement)
return text
def get_zabbix_triggers(group_id):
try:
zapi = ZabbixAPI(ZABBIX_URL)
zapi.login(api_token=ZABBIX_API_TOKEN)
telebot.logger.info(f"Fetching triggers for group {group_id}")
# Получение событий
triggers = zapi.trigger.get(
output=["triggerid", "description", "priority"],
selectHosts=["hostid", "name"],
groupids=group_id,
filter={"priority": ["4", "5"], "value": "1"}, # Высокий приоритет и авария
only_true=1,
active=1,
withLastEventUnacknowledged=1,
expandDescription=1,
expandComment=1,
selectItems=["itemid", "lastvalue"],
selectLastEvent=["clock"]
)
telebot.logger.info(f"Found {len(triggers)} triggers for group {group_id}")
# Московское время
moskva_tz = timezone('Europe/Moscow')
priority_map = {
'4': 'HIGH',
'5': 'DISASTER'
}
trigger_messages = []
for trigger in triggers:
event_time_epoch = int(trigger['lastEvent']['clock'])
event_time = datetime.fromtimestamp(event_time_epoch, tz=moskva_tz)
description = escape_telegram_chars(trigger['description'])
host = trigger['hosts'][0]['name']
priority = priority_map.get(trigger['priority'], 'Неизвестно')
# Получаем itemids
item_ids = [item['itemid'] for item in trigger['items']]
telebot.logger.info(f"Trigger {trigger['triggerid']} on host {host} has itemids: {item_ids}")
# Формируем ссылку для batchgraph
batchgraph_link = f"{ZABBIX_URL}/history.php?action=batchgraph&"
batchgraph_link += "&".join([f"itemids[{item_id}]={item_id}" for item_id in item_ids])
batchgraph_link += "&graphtype=0"
# Заменяем {HOST.NAME} на имя хоста
description = description.replace("{HOST.NAME}", host)
# Заменяем {ITEM.LASTVALUE1} и другие на соответствующие значения
for i, item in enumerate(trigger['items']):
lastvalue_placeholder = f"{{ITEM.LASTVALUE{i + 1}}}"
if lastvalue_placeholder in description:
description = description.replace(lastvalue_placeholder, item['lastvalue'])
event_time_formatted = event_time.strftime('%Y-%m-%d %H:%M:%S Мск')
message = (f"Host: {host}\n"
f"Описание: {description}\n"
f"Критичность: {priority}\n"
f"Время создания: {event_time_formatted}\n"
f'URL: Ссылка на график')
trigger_messages.append(message)
return trigger_messages
except Exception as e:
telebot.logger.error(f"Error fetching triggers for group {group_id}: {e}")
return None
@app.route(BASE_URL + '/webhook', methods=['POST'])
def webhook():
try:
# Получаем данные и логируем
data = request.get_json()
app.logger.info(f"Получены данные: {data}")
# Генерация хеша события и логирование
event_hash = hash_data(data)
app.logger.debug(f"Сгенерирован хеш для события: {event_hash}")
# Работа с базой данных в блоке синхронизации
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Проверяем количество записей в таблице событий
cursor.execute('SELECT COUNT(*) FROM events')
count = cursor.fetchone()[0]
app.logger.debug(f"Текущее количество записей в таблице events: {count}")
# Если записей >= 200, удаляем самое старое событие
if count >= 200:
query = 'DELETE FROM events WHERE id = (SELECT MIN(id) FROM events)'
app.logger.debug(f"Удаление старого события: {query}")
cursor.execute(query)
# Извлечение номера региона из поля host
region_id = extract_region_number(data.get("host"))
if region_id is None:
app.logger.error(f"Не удалось извлечь номер региона из host: {data.get('host')}")
return jsonify({"status": "error", "message": "Invalid host format"}), 400
app.logger.info(f"Извлечён номер региона: {region_id}")
# Запрос подписчиков для отправки уведомления в зависимости от уровня опасности
if data['severity'] == '5': # Авария
query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE'
else: # Высокая опасность
query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE AND disaster_only = FALSE'
app.logger.debug(f"Выполнение запроса: {query} для region_id={region_id}")
cursor.execute(query, (region_id,))
results = cursor.fetchall()
app.logger.info(f"Найдено подписчиков: {len(results)} для региона {region_id}")
# Проверка статуса региона (активен или нет)
query = 'SELECT active FROM regions WHERE region_id = ?'
cursor.execute(query, (region_id,))
region_row = cursor.fetchone()
if region_row and region_row[0]: # Если регион активен
app.logger.info(f"Регион {region_id} активен. Начинаем рассылку сообщений.")
message = format_message(data)
undelivered = False
# Отправляем сообщения подписчикам
for chat_id, username in results:
formatted_message = message.replace('\n',' ').replace('\r','')
app.logger.info(f"Отправка сообщения пользователю @{username} (chat_id={chat_id}) [{formatted_message}]")
try:
send_to_queue({'chat_id': chat_id, 'username': username, 'message': message})
app.logger.info(f"Сообщение поставлено в очередь для {chat_id} (@{username})")
except Exception as e:
app.logger.error(f"Ошибка при отправке сообщения для {chat_id} (@{username}): {e}")
undelivered = True
# Сохранение события, если были проблемы с доставкой
if undelivered:
query = 'INSERT OR IGNORE INTO events (hash, data, delivered) VALUES (?, ?, ?)'
app.logger.debug(f"Сохранение события в базе данных: {query} (hash={event_hash}, delivered={False})")
cursor.execute(query, (event_hash, str(data), False))
# Коммитим изменения в базе данных
conn.commit()
app.logger.info("Изменения в базе данных успешно сохранены.")
conn.close()
# Возвращаем успешный ответ
return jsonify({"status": "success"}), 200
except sqlite3.OperationalError as e:
app.logger.error(f"Ошибка операции с базой данных: {e}")
return jsonify({"status": "error", "message": "Ошибка работы с базой данных"}), 500
except ValueError as e:
app.logger.error(f"Ошибка значения: {e}")
return jsonify({"status": "error", "message": "Некорректные данные"}), 400
except Exception as e:
app.logger.error(f"Неожиданная ошибка: {e}")
return jsonify({"status": "error", "message": "Внутренняя ошибка сервера"}), 500
def format_message(data):
try:
priority_map = {
'4': 'Высокая',
'5': 'Авария'
}
priority = priority_map.get(data['severity'], 'Неизвестно')
if data['status'].upper() == "PROBLEM":
message = (
f"⚠️ {data['host']} ({data['ip']})\n"
f"Описание: {data['msg']}\n"
f"Критичность: {data['severity']}"
)
if 'link' in data:
message += f'\nURL: Ссылка на график'
return message
else:
message = (
f"✅ {data['host']} ({data['ip']})\n"
f"Описание: {data['msg']}\n"
f"Критичность: {data['severity']}\n"
f"Проблема устранена!\n"
f"Время устранения проблемы: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))}\n"
)
if 'link' in data:
message += f'URL: Ссылка на график'
return message
except KeyError as e:
app.logger.error(f"Missing key in data: {e}")
raise ValueError(f"Missing key in data: {e}")
def validate_chat_id(chat_id):
"""Validate that chat_id is composed only of digits."""
return chat_id.isdigit()
def validate_telegram_id(telegram_id):
"""Validate that telegram_id starts with '@'."""
return telegram_id.startswith('@')
def validate_email(email):
"""Validate that email domain is '@rtmis.ru'."""
return re.match(r'^[\w.-]+@rtmis\.ru$', email) is not None
# Маршрут для добавления пользователя
@app.route(BASE_URL + '/users/add', methods=['POST'])
def add_user():
data = request.get_json()
telegram_id = data.get('telegram_id')
chat_id = data.get('chat_id')
user_email = data.get('user_email')
# DEBUG: Логирование полученных данных
app.logger.debug(f"Получены данные для добавления пользователя: {data}")
# Валидация данных
if not validate_chat_id(chat_id):
app.logger.warning(f"Ошибка валидации: некорректный chat_id: {chat_id}")
return jsonify({"status": "failure", "reason": "Invalid data chat_id must be digit"}), 400
if not validate_telegram_id(telegram_id):
app.logger.warning(f"Ошибка валидации: некорректный telegram_id: {telegram_id}")
return jsonify({"status": "failure", "reason": "Invalid data telegram id must start from '@'"}), 400
if not validate_email(user_email):
app.logger.warning(f"Ошибка валидации: некорректный email: {user_email}")
return jsonify({"status": "failure", "reason": "Invalid data email address must be from rtmis"}), 400
if telegram_id and chat_id and user_email:
try:
# INFO: Попытка отправить сообщение пользователю
app.logger.info(f"Отправка сообщения пользователю {telegram_id} с chat_id {chat_id}")
bot.send_message(chat_id, "Регистрация пройдена успешно.")
# DEBUG: Попытка добавления пользователя в whitelist
app.logger.debug(f"Добавление пользователя {telegram_id} в whitelist")
success = rundeck_add_to_whitelist(chat_id, telegram_id, user_email)
if success:
# INFO: Пользователь успешно добавлен в whitelist
app.logger.info(f"Пользователь {telegram_id} добавлен в whitelist.")
user_state_manager.set_state(chat_id, "MAIN_MENU")
# DEBUG: Показ основного меню пользователю
app.logger.debug(f"Отображение основного меню для пользователя с chat_id {chat_id}")
show_main_menu(chat_id)
return jsonify({"status": "success", "msg": f"User {telegram_id} with {user_email} added successfully"}), 200
else:
# INFO: Пользователь уже существует в системе
app.logger.info(f"Пользователь с chat_id {chat_id} уже существует.")
return jsonify({"status": "failure", "msg": "User already exists"}), 400
except telebot.apihelper.ApiTelegramException as e:
if e.result.status_code == 403:
# INFO: Пользователь заблокировал бота
app.logger.info(f"Пользователь {telegram_id} заблокировал бота")
return jsonify({"status": "failure", "msg": f"User {telegram_id} is blocked chat with bot"})
elif e.result.status_code == 400:
# WARNING: Пользователь неизвестен боту, возможно не нажал /start
app.logger.warning(f"Пользователь {telegram_id} с chat_id {chat_id} неизвестен боту, возможно, не нажал /start")
return jsonify({"status": "failure", "msg": f"User {telegram_id} with {chat_id} is unknown to the bot, did the user press /start button?"})
else:
# ERROR: Неизвестная ошибка при отправке сообщения
app.logger.error(f"Ошибка при отправке сообщения пользователю {telegram_id}: {str(e)}")
return jsonify({"status": "failure", "msg": f"{e}"})
else:
# ERROR: Ошибка валидации — недостаточно данных
app.logger.error("Получены некорректные данные для добавления пользователя.")
return jsonify({"status": "failure", "reason": "Invalid data"}), 400
@app.route(BASE_URL + '/users/del', methods=['POST'])
def delete_user():
data = request.get_json()
user_email = data.get('email')
conn = sqlite3.connect(DB_PATH)
try:
# DEBUG: Получен запрос и начинается обработка
app.logger.debug(f"Получен запрос на удаление пользователя. Данные: {data}")
if not user_email:
# WARNING: Ошибка валидации данных, email отсутствует
app.logger.warning(f"Ошибка валидации: отсутствует email")
return jsonify({"status": "failure", "message": "Email is required"}), 400
cursor = conn.cursor()
# DEBUG: Запрос на получение chat_id
app.logger.debug(f"Выполняется запрос на получение chat_id для email: {user_email}")
cursor.execute("SELECT chat_id FROM whitelist WHERE user_email = ?", (user_email,))
user = cursor.fetchone()
if user is None:
# WARNING: Пользователь с указанным email не найден
app.logger.warning(f"Пользователь с email {user_email} не найден")
return jsonify({"status": "failure", "message": "User not found"}), 404
chat_id = user[0]
# INFO: Удаление пользователя и его подписок начато
app.logger.info(f"Начато удаление пользователя с email {user_email} и всех его подписок")
# DEBUG: Удаление пользователя из whitelist
app.logger.debug(f"Удаление пользователя с email {user_email} из whitelist")
cursor.execute("DELETE FROM whitelist WHERE user_email = ?", (user_email,))
# DEBUG: Удаление подписок пользователя
app.logger.debug(f"Удаление подписок для пользователя с chat_id {chat_id}")
cursor.execute("DELETE FROM subscriptions WHERE chat_id = ?", (chat_id,))
conn.commit()
# INFO: Пользователь и подписки успешно удалены
app.logger.info(f"Пользователь с email {user_email} и все его подписки успешно удалены")
return jsonify(
{"status": "success", "message": f"User with email {user_email} and all subscriptions deleted."}), 200
except Exception as e:
conn.rollback()
# ERROR: Ошибка при удалении данных
app.logger.error(f"Ошибка при удалении пользователя с email {user_email}: {str(e)}")
return jsonify({"status": "failure", "message": str(e)}), 500
finally:
conn.close()
# DEBUG: Соединение с базой данных закрыто
app.logger.debug(f"Соединение с базой данных закрыто")
# Маршрут для получения информации о пользователях
@app.route(BASE_URL + '/users/get', methods=['GET'])
def get_users():
try:
# INFO: Запрос на получение списка пользователей
app.logger.info("Запрос на получение информации о пользователях получен")
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# DEBUG: Запрос данных из таблицы whitelist
app.logger.debug("Запрос данных пользователей из таблицы whitelist")
cursor.execute('SELECT * FROM whitelist')
users = cursor.fetchall()
# DEBUG: Формирование словаря пользователей
app.logger.debug("Формирование словаря пользователей")
users_dict = {id: {'id': id, 'username': username, 'email': email, 'events': [], 'worker': '', 'subscriptions': []}
for id, username, email in users}
# DEBUG: Запрос данных событий пользователей
app.logger.debug("Запрос событий пользователей из таблицы user_events")
cursor.execute('SELECT chat_id, username, action, timestamp FROM user_events')
events = cursor.fetchall()
# DEBUG: Обработка событий и добавление их в словарь пользователей
for chat_id, username, action, timestamp in events:
if chat_id in users_dict:
event = {'type': action, 'date': timestamp}
if "Subscribed to region" in action:
region = action.split(": ")[-1]
event['region'] = region
users_dict[chat_id]['events'].append(event)
# DEBUG: Запрос данных подписок пользователей
app.logger.debug("Запрос активных подписок пользователей из таблицы subscriptions")
cursor.execute('SELECT chat_id, region_id FROM subscriptions WHERE active = 1')
subscriptions = cursor.fetchall()
# DEBUG: Добавление подписок к пользователям
for chat_id, region_id in subscriptions:
if chat_id in users_dict:
users_dict[chat_id]['subscriptions'].append(str(region_id))
# INFO: Формирование результата
app.logger.info("Формирование результата для ответа")
result = []
for user in users_dict.values():
ordered_user = {
'email': user['email'],
'username': user['username'],
'id': user['id'],
'worker': user['worker'],
'events': user['events'],
'subscriptions': ', '.join(user['subscriptions'])
}
result.append(ordered_user)
# INFO: Успешная отправка данных пользователей
app.logger.info("Информация о пользователях успешно отправлена")
return jsonify(result)
except Exception as e:
# ERROR: Ошибка при получении информации о пользователях
app.logger.error(f"Ошибка при получении информации о пользователях: {str(e)}")
return jsonify({'status': 'error', 'message': str(e)}), 500
# Маршрут для отображения HTML-страницы с информацией о пользователях
@app.route(BASE_URL + '/users', methods=['GET'])
def view_users():
return render_template('users.html')
# Маршрут для добавления региона
@app.route(BASE_URL + '/regions/add', methods=['POST'])
def add_region():
data = request.json
region_id: int = data.get('region_id')
region_name: str = data.get('region_name')
if not region_id or not region_name:
return jsonify({"status": "error", "message": "Invalid input"}), 400
result = region_api.add_region(region_id, region_name)
return jsonify(result)
# Маршрут для удаления региона
@app.route(BASE_URL + '/regions/del', methods=['POST'])
def del_region():
data = request.json
region_id = data.get('region_id')
if not region_id:
return jsonify({"status": "error", "message": "Invalid input"}), 400
result = region_api.remove_region(region_id)
return jsonify(result)
# Маршрут для получения списка регионов
@app.route(BASE_URL + '/regions/get', methods=['GET'])
def get_regions():
regions = region_api.get_regions()
return jsonify(regions)
@app.route(BASE_URL + '/regions/edit', methods=['POST'])
def edit_region():
data = request.json
region_id = data.get('region_id')
active = data.get('active')
# Проверка валидности данных
if not region_id and active:
return jsonify({"status": "error", "message": "Invalid data received"}), 400
# Обновление региона
result = region_api.update_region_status(region_id, active)
return jsonify(result)
# Маршрут для рендеринга страницы управления регионами
@app.route(BASE_URL + '/regions', methods=['GET'])
def regions_page():
return render_template('regions.html')
# Управление уровнями логирования для Flask
@app.route(BASE_URL + '/debug/flask', methods=['POST'])
def toggle_flask_debug():
try:
data = request.get_json()
level = data.get('level').upper()
if level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
return jsonify({'status': 'error', 'message': 'Invalid log level'}), 400
log_level = getattr(logging, level, logging.DEBUG)
app.logger.setLevel(log_level)
for handler in app.logger.handlers:
handler.setLevel(log_level)
return jsonify({'status': 'success', 'level': level})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
# Управление уровнями логирования для Telebot
@app.route(BASE_URL + '/debug/telebot', methods=['POST'])
def toggle_telebot_debug():
try:
data = request.get_json()
level = data.get('level').upper()
if level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
return jsonify({'status': 'error', 'message': 'Invalid log level'}), 400
log_level = getattr(logging, level, logging.DEBUG)
telebot.logger.setLevel(log_level)
for handler in telebot.logger.handlers:
handler.setLevel(log_level)
return jsonify({'status': 'success', 'level': level})
except Exception as e:
return jsonify({'status': 'error', 'message': str(e)}), 500
# Test functions for admin
def simulate_event(message):
chat_id = message.chat.id
test_event = {
"host": "12",
"msg": "Тестовое сообщение",
"date_reception": "12757175125",
"severity": "5",
"tags": "OTC,OFS,NFS",
"status": "Авария!"
}
app.logger.info(f"Simulating event: {test_event}")
# Use requests to simulate a POST request
response = requests.post('http://localhost:5000/webhook', json=test_event)
app.logger.info(f"Response from webhook: {response.status_code} - {response.text}")
bot.send_message(chat_id, f"Тестовое событие отправлено. Статус ответа: {response.status_code}")
def simulate_triggers(message):
chat_id = message.chat.id
regions = ["12", "19", "35", "40"]
trigger_messages = []
for region_id in regions:
triggers = get_zabbix_triggers(region_id)
if triggers:
trigger_messages.append(f"Регион {region_id}:\n{triggers}")
if trigger_messages:
bot.send_message(chat_id, "\n\n".join(trigger_messages), parse_mode="html")
else:
bot.send_message(chat_id, "Нет активных проблем по указанным регионам.")
def run_polling():
bot.infinity_polling(timeout=10, long_polling_timeout = 5)
# Запуск Flask-приложения
def run_flask():
app.run(port=5000, host='0.0.0.0', debug=True, use_reloader=False)
# Основная функция для запуска
def main():
# Инициализация базы данных
init_db()
# Запуск Flask и бота в отдельных потоках
Thread(target=run_flask, daemon=True).start()
Thread(target=run_polling, daemon=True).start()
# Запуск асинхронных задач
asyncio.run(consume_from_queue())
if __name__ == '__main__':
main()