Add function for grabbing and sending triggers from zabbix to user

Some many fixes
Now i can accept registration requests from my phone :D
This commit is contained in:
Влад Зверев 2024-07-31 18:46:26 +05:00
parent 2c06ccde2e
commit 3838645d7e
3 changed files with 297 additions and 105 deletions

3
.gitignore vendored
View File

@ -3,4 +3,5 @@
/.idea
/TODO.txt
/logs
/__pycache__
/__pycache__
/venv

View File

@ -19,7 +19,4 @@ pyTelegramBotAPI==4.21.0
python-dotenv==1.0.1
pyzabbix==1.3.1
requests==2.32.3
telebot==0.0.5
urllib3==2.2.2
Werkzeug==3.0.3
yarl==1.9.4
pytz~=2024.1

View File

@ -15,6 +15,9 @@ import json
from concurrent.futures import ThreadPoolExecutor
from pyzabbix import ZabbixAPI
import requests
from pytz import timezone
from datetime import datetime, timedelta
import re
# Load environment variables
load_dotenv()
@ -31,11 +34,21 @@ LOG_PATH_EVENTS = os.getenv('LOG_PATH_EVENTS', 'logs/events.log')
os.makedirs(os.path.dirname(LOG_PATH_ERRORS), exist_ok=True)
os.makedirs(os.path.dirname(LOG_PATH_EVENTS), exist_ok=True)
class UTF8StreamHandler(logging.StreamHandler):
def __init__(self, stream=None):
super().__init__(stream)
self.setStream(stream)
def setStream(self, stream):
super().setStream(stream)
if hasattr(stream, 'reconfigure'):
stream.reconfigure(encoding='utf-8')
# Define handlers based on environment variables
handlers = {}
if ENABLE_CONSOLE_LOGGING:
handlers['console'] = {
'class': 'logging.StreamHandler',
'class': 'telezab.UTF8StreamHandler',
'stream': 'ext://sys.stdout', # Output to console
'formatter': 'console',
}
@ -44,11 +57,13 @@ if ENABLE_FILE_LOGGING:
'class': 'logging.FileHandler',
'filename': LOG_PATH_ERRORS,
'formatter': 'error',
'encoding': 'utf-8', # Ensure UTF-8 encoding
}
handlers['file_events'] = {
'class': 'logging.FileHandler',
'filename': LOG_PATH_EVENTS,
'formatter': 'event',
'encoding': 'utf-8', # Ensure UTF-8 encoding
}
# Configure logging
@ -175,13 +190,13 @@ def is_whitelisted(chat_id):
# Add user to whitelist
def add_to_whitelist(chat_id):
def add_to_whitelist(chat_id, username):
with db_lock:
conn = sqlite3.connect('telezab.db')
cursor = conn.cursor()
query = 'INSERT OR IGNORE INTO whitelist (chat_id) VALUES (?)'
app.logger.debug(f"Executing query: {query} with chat_id={chat_id}")
cursor.execute(query, (chat_id,))
query = 'INSERT OR IGNORE INTO whitelist (chat_id, username) VALUES (?, ?)'
app.logger.debug(f"Executing query: {query} with chat_id={chat_id}, username={username}")
cursor.execute(query, (chat_id, username))
conn.commit()
conn.close()
@ -530,11 +545,38 @@ def handle_register(message):
else:
username = "N/A"
markup = telebot.types.ReplyKeyboardMarkup(one_time_keyboard=True, resize_keyboard=True)
markup.add('Подтвердить регистрацию', 'Отмена')
bot.send_message(chat_id, f"Ваш chat ID: {chat_id}, ваше имя пользователя: {username}. Запрос на одобрение отправлен администратору.", reply_markup=markup)
bot.send_message(chat_id,
f"Ваш chat ID: {chat_id}, ваше имя пользователя: {username}. Запрос на одобрение отправлен администратору.")
log_user_event(chat_id, username, "Requested registration")
bot.register_next_step_handler(message, process_register, chat_id, username)
for admin_chat_id in ADMIN_CHAT_IDS:
markup = telebot.types.InlineKeyboardMarkup()
markup.add(
telebot.types.InlineKeyboardButton(text="Подтвердить", callback_data=f"approve_{chat_id}_{username}"),
telebot.types.InlineKeyboardButton(text="Отменить", callback_data=f"decline_{chat_id}_{username}")
)
bot.send_message(
admin_chat_id,
f"Пользователь {username} ({chat_id}) запрашивает регистрацию. Вы подтверждаете это действие?",
reply_markup=markup
)
@bot.callback_query_handler(func=lambda call: call.data.startswith("approve_") or call.data.startswith("decline_"))
def handle_admin_response(call):
action, chat_id, username = call.data.split("_")
if action == "approve":
add_to_whitelist(int(chat_id), username)
bot.send_message(chat_id, "Ваша регистрация одобрена администратором.")
bot.send_message(call.message.chat.id, f"Пользователь {username} ({chat_id}) добавлен в белый список.")
log_user_event(chat_id, username, "Approved registration")
elif action == "decline":
bot.send_message(chat_id, "Ваша регистрация отклонена администратором.")
bot.send_message(call.message.chat.id, f"Пользователь {username} ({chat_id}) отклонен.")
log_user_event(chat_id, username, "Declined registration")
bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None)
bot.answer_callback_query(call.id)
def process_register(message, chat_id, username):
@ -572,6 +614,7 @@ def prompt_admin_for_region(chat_id, action):
def process_add_region(message):
chat_id = message.chat.id
try:
region_id, region_name = message.text.split()[0], ' '.join(message.text.split()[1:])
with db_lock:
@ -593,6 +636,7 @@ def process_add_region(message):
markup = telebot.types.InlineKeyboardMarkup()
markup.add(telebot.types.InlineKeyboardButton(text="Заменить", callback_data=f"replace_{region_id}_{region_name}"))
markup.add(telebot.types.InlineKeyboardButton(text="Активировать старый", callback_data=f"reactivate_{region_id}"))
markup.add(telebot.types.InlineKeyboardButton(text="Отмена", callback_data=f"cancel_region_{chat_id}"))
bot.send_message(chat_id, f"Регион {region_id} уже существует с названием {existing_region_name}. Хотите заменить его или активировать старый регион?", reply_markup=markup)
else:
query = 'INSERT OR IGNORE INTO regions (region_id, region_name) VALUES (?, ?)'
@ -606,7 +650,7 @@ def process_add_region(message):
bot.send_message(chat_id, "Неверный формат. Используйте: <region_id> <region_name>")
@bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith("reactivate_"))
@bot.callback_query_handler(func=lambda call: call.data.startswith("replace_") or call.data.startswith("reactivate_") or call.data.startswith("cancel_region_"))
def handle_region_action(call):
parts = call.data.split("_", 2)
action = parts[0]
@ -635,12 +679,14 @@ def handle_region_action(call):
cursor.execute(query, (region_id,))
bot.send_message(chat_id, f"Регион {region_id} активирован.")
app.logger.info(f"Admin {chat_id} reactivated region {region_id}.")
elif action == "cancel_region":
bot.send_message(chat_id, "Действие отменено.")
app.logger.info(f"Admin {chat_id} canceled region action.")
conn.commit()
conn.close()
bot.answer_callback_query(call.id) # Завершение обработки callback
bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None)
show_settings_menu(chat_id)
bot.answer_callback_query(call.id) # Завершение обработки callback
def process_remove_region(message):
chat_id = message.chat.id
@ -672,40 +718,40 @@ def process_remove_region(message):
show_settings_menu(chat_id)
# Handle admin whitelist management commands
def prompt_admin_for_whitelist(chat_id, action):
if action == 'add':
bot.send_message(chat_id, "Введите ID пользователя, которого хотите добавить в белый список")
bot.register_next_step_handler_by_chat_id(chat_id, process_add_whitelist)
elif action == 'remove':
bot.send_message(chat_id, "Введите ID пользователя, которого хотите удалить из белого списка")
bot.register_next_step_handler_by_chat_id(chat_id, process_remove_whitelist)
# # Handle admin whitelist management commands
# def prompt_admin_for_whitelist(chat_id, action):
# if action == 'add':
# bot.send_message(chat_id, "Введите ID пользователя, которого хотите добавить в белый список")
# bot.register_next_step_handler_by_chat_id(chat_id, process_add_whitelist)
# elif action == 'remove':
# bot.send_message(chat_id, "Введите ID пользователя, которого хотите удалить из белого списка")
# bot.register_next_step_handler_by_chat_id(chat_id, process_remove_whitelist)
#
#
# def process_add_whitelist(message):
# chat_id = message.chat.id
# try:
# new_chat_id = int(message.text.split()[0])
# add_to_whitelist(new_chat_id)
# bot.send_message(chat_id, f"Chat ID {new_chat_id} добавлен в белый список.")
# app.logger.info(f"Admin {chat_id} added {new_chat_id} to the whitelist.")
# log_user_event(new_chat_id, "N/A", f"Added to whitelist by {chat_id} (@{message.from_user.username})")
# except (IndexError, ValueError):
# bot.send_message(chat_id, "Неверный формат. Используйте: <chat_id>")
# show_settings_menu(chat_id)
def process_add_whitelist(message):
chat_id = message.chat.id
try:
new_chat_id = int(message.text.split()[0])
add_to_whitelist(new_chat_id)
bot.send_message(chat_id, f"Chat ID {new_chat_id} добавлен в белый список.")
app.logger.info(f"Admin {chat_id} added {new_chat_id} to the whitelist.")
log_user_event(new_chat_id, "N/A", f"Added to whitelist by {chat_id} (@{message.from_user.username})")
except (IndexError, ValueError):
bot.send_message(chat_id, "Неверный формат. Используйте: <chat_id>")
show_settings_menu(chat_id)
def process_remove_whitelist(message):
chat_id = message.chat.id
try:
remove_chat_id = int(message.text.split()[0])
remove_from_whitelist(remove_chat_id)
bot.send_message(chat_id, f"Chat ID {remove_chat_id} удален из белого списка.")
app.logger.info(f"Admin {chat_id} removed {remove_chat_id} from the whitelist.")
log_user_event(remove_chat_id, "N/A", f"Removed from whitelist by {chat_id} (@{message.from_user.username})")
except (IndexError, ValueError):
bot.send_message(chat_id, "Неверный формат. Используйте: <chat_id>")
show_settings_menu(chat_id)
# def process_remove_whitelist(message):
# chat_id = message.chat.id
# try:
# remove_chat_id = int(message.text.split()[0])
# remove_from_whitelist(remove_chat_id)
# bot.send_message(chat_id, f"Chat ID {remove_chat_id} удален из белого списка.")
# app.logger.info(f"Admin {chat_id} removed {remove_chat_id} from the whitelist.")
# log_user_event(remove_chat_id, "N/A", f"Removed from whitelist by {chat_id} (@{message.from_user.username})")
# except (IndexError, ValueError):
# bot.send_message(chat_id, "Неверный формат. Используйте: <chat_id>")
# show_settings_menu(chat_id)
# Handle displaying active subscriptions for a user
@ -828,59 +874,128 @@ async def check_telegram_api():
app.logger.error(f"Error checking Telegram API: {e}")
def extract_region_number(host):
# Используем регулярное выражение для извлечения цифр после первого символа и до первой буквы
match = re.match(r'^.\d+', host)
if match:
return match.group(0)[1:] # Возвращаем строку без первого символа
return None
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.get_json()
app.logger.info(f"Received data: {data}")
try:
data = request.get_json()
app.logger.info(f"Received data: {data}")
event_hash = hash_data(data)
event_hash = hash_data(data)
with db_lock:
conn = sqlite3.connect('telezab.db')
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM events')
count = cursor.fetchone()[0]
with db_lock:
conn = sqlite3.connect('telezab.db')
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM events')
count = cursor.fetchone()[0]
if count >= 200:
query = 'DELETE FROM events WHERE id = (SELECT MIN(id) FROM events)'
app.logger.debug(f"Executing query: {query}")
cursor.execute(query)
if count >= 200:
query = 'DELETE FROM events WHERE id = (SELECT MIN(id) FROM events)'
app.logger.debug(f"Executing query: {query}")
cursor.execute(query)
query = 'INSERT OR IGNORE INTO events (hash, data, delivered) VALUES (?, ?, ?)'
app.logger.debug(f"Executing query: {query} with hash={event_hash}, data={data}, delivered={False}")
cursor.execute(query, (event_hash, str(data), False))
# Извлечение номера региона из поля host
region_id = extract_region_number(data.get("host"))
if region_id is None:
app.logger.error(f"Failed to extract region number from host: {data.get('host')}")
return jsonify({"status": "error", "message": "Invalid host format"}), 400
# Fetch chat_ids to send the alert
region_id = data.get("region")
query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE AND skip = FALSE'
app.logger.debug(f"Executing query: {query} with region_id={region_id}")
cursor.execute(query, (region_id,))
results = cursor.fetchall()
# Fetch chat_ids to send the alert
query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE AND skip = FALSE'
app.logger.debug(f"Executing query: {query} with region_id={region_id}")
cursor.execute(query, (region_id,))
results = cursor.fetchall()
# Check if the region is active
query = 'SELECT active FROM regions WHERE region_id = ?'
cursor.execute(query, (region_id,))
region_active = cursor.fetchone()[0]
# Check if the region is active
query = 'SELECT active FROM regions WHERE region_id = ?'
cursor.execute(query, (region_id,))
region_row = cursor.fetchone()
if region_active:
message = format_message(data)
for chat_id, username in results:
app.logger.debug(f"Queueing message: {message} to chat_id={chat_id}, username={username}")
send_to_queue({'chat_id': chat_id, 'message': message})
app.logger.info(f"Queued alert for {chat_id} ({username}) for region {region_id}")
if region_row and region_row[0]: # Check if region_row is not None and if active
message = format_message(data)
undelivered = False
for chat_id, username in results:
app.logger.debug(f"Queueing message: {message} to chat_id={chat_id}, username={username}")
try:
send_to_queue({'chat_id': chat_id, 'message': message})
app.logger.info(f"Queued alert for {chat_id} ({username}) for region {region_id}")
except Exception as e:
app.logger.error(f"Failed to send message to {chat_id} ({username}): {e}")
undelivered = True
conn.commit()
conn.close()
if undelivered:
query = 'INSERT OR IGNORE INTO events (hash, data, delivered) VALUES (?, ?, ?)'
app.logger.debug(f"Executing query: {query} with hash={event_hash}, data={data}, delivered={False}")
cursor.execute(query, (event_hash, str(data), False))
return jsonify({"status": "success"}), 200
conn.commit()
conn.close()
return jsonify({"status": "success"}), 200
except sqlite3.OperationalError as e:
app.logger.error(f"Database operation error: {e}")
return jsonify({"status": "error", "message": "Database operation error"}), 500
except ValueError as e:
app.logger.error(f"Value error: {e}")
return jsonify({"status": "error", "message": "Invalid data"}), 400
except Exception as e:
app.logger.error(f"Unexpected error: {e}")
return jsonify({"status": "error", "message": "Internal server error"}), 500
def format_message(data):
return (f"Zabbix Alert\n"
f"Host: {data['host']}\n"
f"Item: {data['item']}\n"
f"Trigger: {data['trigger']}\n"
f"Value: {data['value']}")
try:
status_emoji = "⚠️" if data['status'].upper() == "PROBLEM" else ""
priority_map = {
'High': 'Высокая',
'Disaster': 'Авария'
}
priority = priority_map.get(data['severity'], 'Неизвестно')
message = (
f"{status_emoji} Host: {data['host']}\n"
f"Сообщение: {data['msg']}\n"
f"Дата получения: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))}\n"
f"Критичность: {priority}\n"
f"Теги: {data['tags']}\n"
f"Статус: {data['status']}"
)
if 'link' in data:
message += f'\nСсылка на график: {data['link']}'
return message
except KeyError as e:
app.logger.error(f"Missing key in data: {e}")
raise ValueError(f"Missing key in data: {e}")
@app.route('/add_user', methods=['POST'])
def add_user():
data = request.get_json()
telegram_id = data.get('telegram_id')
chat_id = data.get('chat_id')
if telegram_id and chat_id:
add_to_whitelist(chat_id, telegram_id)
return jsonify({"status": "success"}), 200
else:
return jsonify({"status": "failure", "reason": "Invalid data"}), 400
# def format_message(data):
# return (f"Zabbix Alert\n"
# f"Host: {data['host']}\n"
# f"Item: {data['item']}\n"
# f"Trigger: {data['trigger']}\n"
# f"Value: {data['value']}")
# Handle active triggers
@ -933,6 +1048,8 @@ def handle_region_selection(call):
if not filtered_groups:
bot.send_message(chat_id, "Нет групп хостов, соответствующих данному региону.")
bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None)
bot.answer_callback_query(call.id)
return
# Отправка списка групп хостов пользователю в виде кнопок
@ -945,6 +1062,8 @@ def handle_region_selection(call):
logging.error(f"Error connecting to Zabbix API: {e}")
bot.send_message(chat_id, "Не удалось подключиться к Zabbix API. Пожалуйста, попробуйте позже.")
bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None)
bot.answer_callback_query(call.id)
@bot.callback_query_handler(func=lambda call: call.data.startswith("group_"))
def handle_group_selection(call):
@ -959,12 +1078,16 @@ def handle_group_selection(call):
elif not triggers:
bot.send_message(chat_id, "Нет активных проблем по указанной группе за последние 24 часа.")
else:
bot.send_message(chat_id, triggers, parse_mode="Markdown")
for trigger in triggers:
bot.send_message(chat_id, trigger, parse_mode="html")
time.sleep(1/5)
except Exception as e:
logging.error(f"Error processing group selection: {e}")
bot.send_message(chat_id, "Произошла ошибка при обработке вашего запроса. Пожалуйста, попробуйте позже.")
bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=None)
bot.answer_callback_query(call.id)
@bot.callback_query_handler(func=lambda call: call.data.startswith("prev_") or call.data.startswith("next_"))
@ -985,14 +1108,15 @@ def handle_pagination(call):
bot.answer_callback_query(call.id) # Завершение обработки callback
# Функция для получения активных триггеров из Zabbix API
def get_zabbix_triggers(group_id):
try:
# Создайте подключение к вашему Zabbix серверу
zapi = ZabbixAPI(ZABBIX_URL)
zapi.login(api_token=ZABBIX_API_TOKEN)
# Получение триггеров уровня "Высокая" и "Авария" за последние 24 часа для указанной группы хостов
time_from = int(time.time()) - 24 * 3600 # последние 24 часа
time_till = int(time.time())
triggers = zapi.trigger.get(
output=["triggerid", "description", "priority"],
selectHosts=["hostid", "name"],
@ -1002,22 +1126,46 @@ def get_zabbix_triggers(group_id):
active=1,
withLastEventUnacknowledged=1,
time_from=time_from,
time_till=time_till,
expandDescription=1,
expandComment=1,
selectItems=["itemid", "lastvalue"]
selectItems=["itemid", "lastvalue"],
selectLastEvent=["clock"]
)
# Московское время
moskva_tz = timezone('Europe/Moscow')
priority_map = {
'4': 'Высокая',
'5': 'Авария'
}
trigger_messages = []
current_time = datetime.now(moskva_tz)
one_day_ago = current_time - timedelta(days=1)
for trigger in triggers:
# Получаем время последнего события
event_time_epoch = int(trigger['lastEvent']['clock'])
event_time = datetime.fromtimestamp(event_time_epoch, tz=moskva_tz)
# Проверяем, не старше ли событие чем на сутки
if event_time < one_day_ago:
continue
description = trigger['description']
host = trigger['hosts'][0]['name']
priority = priority_map.get(trigger['priority'], 'Неизвестно')
trigger_id = trigger['triggerid']
# Генерация ссылки на график
item_ids = [item['itemid'] for item in trigger['items']]
graph_links = []
for item_id in item_ids:
graph_link = f'<a href="{ZABBIX_URL}/history.php?action=showgraph&itemids[]={item_id}">Ссылка на график</a>'
graph_links.append(graph_link)
graph_links_str = "\n".join(graph_links)
# Заменяем {HOST.NAME} на имя хоста
description = description.replace("{HOST.NAME}", host)
@ -1028,30 +1176,76 @@ def get_zabbix_triggers(group_id):
if lastvalue_placeholder in description:
description = description.replace(lastvalue_placeholder, item['lastvalue'])
message = f"*Host*: {host}\n*Критичность*: {priority}\n*Описание*: {description}"
if trigger_id:
message += f"\n[Ссылка на триггер]({ZABBIX_URL}/tr_events.php?triggerid={trigger_id})"
event_time_formatted = event_time.strftime('%Y-%m-%d %H:%M:%S Мск')
message = (f"<b>Host</b>: {host}\n"
f"<b>Критичность</b>: {priority}\n"
f"<b>Описание</b>: {description}\n"
f"<b>Время создания</b>: {event_time_formatted}\n"
f"{graph_links_str}")
trigger_messages.append(message)
return "\n---\n".join(trigger_messages)
return trigger_messages
except Exception as e:
logging.error(f"Error connecting to Zabbix API: {e}")
return None
# def split_message(message):
# max_length = 4096
# if len(message) <= max_length:
# return [message]
#
# parts = []
# while len(message) > max_length:
# split_index = message[:max_length].rfind('\n')
# if split_index == -1:
# split_index = max_length
# parts.append(message[:split_index])
# message = message[split_index:]
#
# parts.append(message)
# return parts
# def split_message_by_delimiter(messages, delimiter="---"):
# max_length = 4096
# parts = []
# current_part = ""
#
# for message in messages:
# if len(current_part) + len(message) + len(delimiter) <= max_length:
# if current_part:
# current_part += f"\n\n{delimiter}\n\n"
# current_part += message
# else:
# parts.append(current_part)
# current_part = message
#
# if current_part:
# parts.append(current_part)
#
# return parts
async def send_group_messages(chat_id, messages):
for message in messages:
await send_message(chat_id, message, is_notification=True)
await asyncio.sleep(1) # Delay between messages to comply with rate limit
# Test functions for admin
def simulate_event(message):
chat_id = message.chat.id
test_event = {
"region": "12",
"host": "Тестовое сообщение",
"item": "Марий Эл",
"trigger": "Спасайте её",
"value": "Авария!"
"host": "12",
"msg": "Тестовое сообщение",
"date_reception": "12757175125",
"severity": "5",
"tags": "OTC,OFS,NFS",
"status": "Авария!"
}
app.logger.info(f"Simulating event: {test_event}")
# Use requests to simulate a POST request
response = requests.post('http://localhost:5000/webhook', json=test_event)
@ -1069,7 +1263,7 @@ def simulate_triggers(message):
trigger_messages.append(f"Регион {region_id}:\n{triggers}")
if trigger_messages:
bot.send_message(chat_id, "\n\n".join(trigger_messages), parse_mode="Markdown")
bot.send_message(chat_id, "\n\n".join(trigger_messages), parse_mode="html")
else:
bot.send_message(chat_id, "Нет активных проблем по указанным регионам за последние 24 часа.")
@ -1082,7 +1276,7 @@ if __name__ == '__main__':
init_db()
# Start Flask app in a separate thread
Thread(target=app.run, kwargs={'port': 5000, 'host': '0.0.0.0', 'debug': False, 'use_reloader': False}, daemon=True).start()
Thread(target=app.run, kwargs={'port': 5000, 'host': '0.0.0.0', 'debug': True, 'use_reloader': False}, daemon=True).start()
# Start bot polling in a separate thread
Thread(target=run_polling, daemon=True).start()