Need broke them all and fix again

This commit is contained in:
Влад Зверев 2025-02-09 09:58:02 +05:00
parent c72df3fd00
commit bfe4b1e938
5 changed files with 352 additions and 97 deletions

View File

@ -8,4 +8,4 @@
/telezab.db
/db/
/db/telezab.db
/trash/

90
rabbitmq_worker.py Normal file
View File

@ -0,0 +1,90 @@
import os
import asyncio
import logging
import json
import time
import pika
from concurrent.futures import ThreadPoolExecutor
from functools import partial
# RabbitMQ configuration
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
RABBITMQ_QUEUE = 'telegram_notifications'
RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN')
RABBITMQ_PASS = os.getenv('RABBITMQ_PASS')
# Импорт функций для отправки сообщений
from telezab import send_notification_message # Замените на актуальный путь к send_notification_message
class RabbitMQWorker:
def __init__(self):
self.connection = None
self.channel = None
def rabbitmq_connection(self):
"""Устанавливает подключение к RabbitMQ."""
try:
credentials = pika.PlainCredentials(RABBITMQ_LOGIN, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
logging.info("RabbitMQ connection established.")
except Exception as e:
logging.error(f"Error establishing RabbitMQ connection: {e}")
self.connection = None
self.channel = None
async def consume_from_queue(self):
"""Основной цикл обработки сообщений из RabbitMQ."""
while True:
try:
if not self.connection or self.connection.is_closed:
self.rabbitmq_connection()
for method_frame, properties, body in self.channel.consume(RABBITMQ_QUEUE, inactivity_timeout=5):
if not method_frame:
continue # Нет новых сообщений, продолжаем ждать
# Декодируем сообщение из очереди
message = json.loads(body)
chat_id = message['chat_id']
username = message['username']
message_text = message['message']
try:
# Отправляем сообщение
await send_notification_message(chat_id, message_text, username)
self.channel.basic_ack(method_frame.delivery_tag) # Подтверждаем получение
except Exception as e:
logging.error(f"Error processing message: {e}")
self.channel.basic_nack(method_frame.delivery_tag) # Возвращаем сообщение в очередь
except pika.exceptions.AMQPConnectionError as e:
logging.error(f"RabbitMQ connection error: {e}. Reconnecting in 5 seconds...")
self.close_connection()
await asyncio.sleep(5)
except Exception as e:
logging.error(f"Unexpected error in RabbitMQ consumer: {e}")
await asyncio.sleep(5)
def close_connection(self):
"""Закрывает соединение с RabbitMQ."""
if self.connection and not self.connection.is_closed:
self.connection.close()
logging.info("RabbitMQ connection closed.")
async def run(self):
"""Запускает основной цикл обработки очереди."""
try:
await self.consume_from_queue()
except asyncio.CancelledError:
logging.info("RabbitMQ consumer stopped.")
finally:
self.close_connection()

Binary file not shown.

View File

@ -17,6 +17,7 @@ import time
import asyncio
import aiohttp
import pika
import aio_pika
import json
from concurrent.futures import ThreadPoolExecutor
from pyzabbix import ZabbixAPI
@ -27,35 +28,13 @@ import re
import urllib.parse
from log_manager import LogManager
# from rabbitmq_worker import RABBITMQ_LOGIN
from region_api import RegionAPI
from user_state_manager import UserStateManager
# Load environment variables
load_dotenv()
# Функция для загрузки значения из файла
def load_value_from_file(file_name):
try:
with open(file_name, 'r') as file:
return file.read().strip()
except FileNotFoundError:
return None
# Функция для получения переменной из окружения или файла
# def get_variable_value(variable_name):
# # Попытка получить значение из окружения
# value = os.getenv(variable_name)
#
# # Если переменная окружения не установлена, попробуем загрузить из файла
# if not value:
# file_value = "file_" + variable_name
# value = os.getenv(file_value)
# with open(value, 'r') as file:
# value = file.read()
# return value
# return value
#
DEV = os.getenv('DEV')
# Загрузка переменных окружения или значений из файлов
TOKEN = os.getenv('TELEGRAM_TOKEN')
@ -70,6 +49,7 @@ RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', 'localhost')
RABBITMQ_QUEUE = 'telegram_notifications'
RABBITMQ_LOGIN = os.getenv('RABBITMQ_LOGIN')
RABBITMQ_PASS = os.getenv('RABBITMQ_PASS')
RABBITMQ_URL = "amqp://"+ RABBITMQ_LOGIN + ":" + RABBITMQ_PASS + "@" + RABBITMQ_HOST +"/" # Замените на ваш URL RabbitMQ
# Инициализируем класс RegionApi
region_api = RegionAPI(DB_PATH)
# Инициализируем класс UserStateManager
@ -101,64 +81,74 @@ rate_limit_semaphore = asyncio.Semaphore(25) # 25 messages per second
def init_db():
try:
# 1⃣ Проверяем и создаём каталог, если его нет
db_dir = os.path.dirname(DB_PATH)
if not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True) # Создаём каталог рекурсивно
# 2⃣ Проверяем, существует ли файл базы данных
db_exists = os.path.exists(DB_PATH)
# 3⃣ Открываем соединение, если файла нет, он создастся автоматически
with db_lock:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create events table
cursor.execute('''CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT UNIQUE,
data TEXT,
delivered BOOLEAN)''')
# 4⃣ Если базы не было, создаём таблицы
if not db_exists:
cursor.execute('''CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT UNIQUE,
data TEXT,
delivered BOOLEAN)''')
# Create subscriptions table with username and active flag
cursor.execute('''CREATE TABLE IF NOT EXISTS subscriptions (
chat_id INTEGER,
region_id TEXT,
username TEXT,
active BOOLEAN DEFAULT TRUE,
skip BOOLEAN DEFAULT FALSE,
disaster_only BOOLEAN DEFAULT FALSE,
UNIQUE(chat_id, region_id))''')
cursor.execute('''CREATE TABLE subscriptions (
chat_id INTEGER,
region_id TEXT,
username TEXT,
active BOOLEAN DEFAULT TRUE,
skip BOOLEAN DEFAULT FALSE,
disaster_only BOOLEAN DEFAULT FALSE,
UNIQUE(chat_id, region_id))''')
# Create whitelist table
cursor.execute('''CREATE TABLE IF NOT EXISTS whitelist (
chat_id INTEGER PRIMARY KEY,
username TEXT,
user_email TEXT)''')
cursor.execute('''CREATE TABLE whitelist (
chat_id INTEGER PRIMARY KEY,
username TEXT,
user_email TEXT)''')
# Create whitelist table
cursor.execute('''CREATE TABLE IF NOT EXISTS admins (
chat_id INTEGER PRIMARY KEY,
username TEXT)''')
# Create regions table with active flag
cursor.execute('''CREATE TABLE IF NOT EXISTS regions (
region_id TEXT PRIMARY KEY,
region_name TEXT,
active BOOLEAN DEFAULT TRUE)''')
cursor.execute('''CREATE TABLE admins (
chat_id INTEGER PRIMARY KEY,
username TEXT)''')
# Create user events table for logging
cursor.execute('''CREATE TABLE IF NOT EXISTS user_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER,
username TEXT,
action TEXT,
timestamp TEXT)''')
cursor.execute('''CREATE TABLE regions (
region_id TEXT PRIMARY KEY,
region_name TEXT,
active BOOLEAN DEFAULT TRUE)''')
# Insert sample regions
cursor.execute('''INSERT OR IGNORE INTO regions (region_id, region_name) VALUES
('01', 'Адыгея'),
('02', 'Башкортостан (Уфа)'),
('04', 'Алтай'),
('19', 'Республика Хакасия')''')
cursor.execute('''CREATE TABLE user_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER,
username TEXT,
action TEXT,
timestamp TEXT)''')
# Добавляем тестовые данные (если их нет)
cursor.execute('''INSERT OR IGNORE INTO regions (region_id, region_name) VALUES
('01', 'Адыгея'),
('02', 'Башкортостан (Уфа)'),
('04', 'Алтай'),
('19', 'Республика Хакасия')''')
conn.commit()
app.logger.info("✅ Database created and initialized successfully.")
else:
app.logger.info("✅ Database already exists. Skipping initialization.")
conn.commit()
app.logger.info("Database initialized successfully.")
except Exception as e:
app.logger.error(f"Error initializing database: {e}")
app.logger.error(f"❌ Error initializing database: {e}")
finally:
conn.close()
if 'conn' in locals(): # Проверяем, была ли создана переменная conn
conn.close()
@ -855,24 +845,77 @@ def send_to_queue(message):
connection.close()
# async def consume_from_queue():
# connection, channel = rabbitmq_connection()
#
# for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE):
# message = json.loads(body)
# chat_id = message['chat_id']
# username = message['username']
# message_text = message['message']
#
# try:
# await send_notification_message(chat_id, message_text, username)
# channel.basic_ack(method_frame.delivery_tag)
# except Exception as e:
# telebot.logger.error(f"Error sending message from queue: {e}")
# # Optionally, you can nack the message to requeue it
# # channel.basic_nack(method_frame.delivery_tag)
#
# connection.close()
async def consume_from_queue():
connection, channel = rabbitmq_connection()
for method_frame, properties, body in channel.consume(RABBITMQ_QUEUE):
message = json.loads(body)
chat_id = message['chat_id']
username = message['username']
message_text = message['message']
while True: # Бесконечный цикл для переподключения
try:
await send_notification_message(chat_id, message_text, username)
channel.basic_ack(method_frame.delivery_tag)
except Exception as e:
telebot.logger.error(f"Error sending message from queue: {e}")
# Optionally, you can nack the message to requeue it
# channel.basic_nack(method_frame.delivery_tag)
# Подключение к RabbitMQ
connection = await aio_pika.connect_robust(RABBITMQ_URL)
async with connection:
# Открываем канал
channel = await connection.channel()
# Объявляем очередь (если нужно)
queue = await channel.declare_queue(RABBITMQ_QUEUE, durable=True)
connection.close()
# Потребляем сообщения
async for message in queue:
async with message.process(): # Авто подтверждение сообщения
try:
# Парсинг сообщения
data = json.loads(message.body)
# Проверка структуры сообщения
if not isinstance(data, dict):
raise ValueError("Invalid message format: Expected a dictionary")
# Извлечение необходимых данных
chat_id = data.get("chat_id")
username = data.get("username")
message_text = data.get("message")
# Проверка обязательных полей
if not all([chat_id, username, message_text]):
raise ValueError(f"Missing required fields in message: {data}")
# Отправляем сообщение
await send_notification_message(chat_id, message_text, username)
except json.JSONDecodeError:
# Логируем некорректный JSON
telebot.logger.error(f"Failed to decode message: {message.body}")
except ValueError as ve:
# Логируем ошибку формата сообщения
telebot.logger.error(f"Invalid message: {ve}")
except Exception as e:
# Логируем общую ошибку при обработке
telebot.logger.error(f"Error sending message from queue: {e}")
except aio_pika.exceptions.AMQPError as e:
# Логируем ошибку RabbitMQ и переподключаемся
telebot.logger.error(f"RabbitMQ error: {e}")
except Exception as e:
# Логируем общую ошибку и ждем перед переподключением
telebot.logger.error(f"Critical error in consume_from_queue: {e}")
finally:
# Задержка перед переподключением
await asyncio.sleep(5)
async def send_message(chat_id, message, is_notification=False):
@ -892,6 +935,9 @@ async def send_message(chat_id, message, is_notification=False):
telebot.logger.warning(f"Rate limit exceeded for chat_id {chat_id}. Retrying...")
await asyncio.sleep(1)
await send_message(chat_id, message, is_notification)
elif "403" in str(e):
telebot.logger.warning(f"Can't send message to user because bot blocked by user with chat id: {chat_id}")
pass
else:
telebot.logger.error(f"Failed to send message to {chat_id}: {e}")
telebot.logger.error(f"Detailed Error: {e}", exc_info=True) # Добавлено логирование исключения
@ -902,13 +948,14 @@ async def send_message(chat_id, message, is_notification=False):
finally:
if is_notification:
rate_limit_semaphore.release()
formatted_message = message.replace('\n', ' ').replace('\r', '')
telebot.logger.info(f'Send notification to {chat_id} from RabbitMQ [{formatted_message}]')
async def send_notification_message(chat_id, message, username):
await send_message(chat_id, message, is_notification=True)
formatted_message = message.replace('\n', ' ').replace('\r', '')
telebot.logger.info(f'Send notification to {username} {chat_id} from RabbitMQ [{formatted_message}]')
# formatted_message = message.replace('\n', ' ').replace('\r', '')
# telebot.logger.info(f'Send notification to {username} {chat_id} from RabbitMQ [{formatted_message}]')
async def run_in_executor(func, *args):
@ -1309,10 +1356,10 @@ def webhook():
return jsonify({"status": "error", "message": "Invalid host format"}), 400
app.logger.debug(f"Извлечён номер региона: {region_id}")
# Запрос подписчиков для отправки уведомления в зависимости от уровня опасности
if data['severity'] == '5': # Авария
# Запрос подписчиков для отправки уведомления в зависимости от уровня критичности
if data['severity'] == 'Disaster': # Авария
query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE'
else: # Высокая опасность
else: # Высокая критичность
query = 'SELECT chat_id, username FROM subscriptions WHERE region_id = ? AND active = TRUE AND disaster_only = FALSE'
app.logger.debug(f"Выполнение запроса: {query} для region_id={region_id}")
@ -1334,7 +1381,8 @@ def webhook():
# Отправляем сообщения подписчикам
for chat_id, username in results:
formatted_message = message.replace('\n',' ').replace('\r','')
app.logger.info(f"Отправка сообщения пользователю @{username} (chat_id={chat_id}) [{formatted_message}]")
app.logger.info(f"Формирование сообщения для пользователя {username} (chat_id={chat_id}) [{formatted_message}]")
try:
send_to_queue({'chat_id': chat_id, 'username': username, 'message': message})
app.logger.debug(f"Сообщение поставлено в очередь для {chat_id} (@{username})")
@ -1377,13 +1425,13 @@ def format_message(data):
'Disaster': '⛔️'
}
priority = priority_map.get(data['severity'])
msg = escape_telegram_chars(data['msg'])
if data['status'].upper() == "PROBLEM":
message = (
f"{priority} {data['host']} ({data['ip']})\n"
f"<b>Описание</b>: {data['msg']}\n"
f"<b>Описание</b>: {msg}\n"
f"<b>Критичность</b>: {data['severity']}\n"
f"<b>Время возникновения проблемы</b>: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n"
f"<b>Время возникновения</b>: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n"
)
if 'link' in data:
message += f'<b>URL</b>: <a href="{data['link']}">Ссылка на график</a>'
@ -1391,10 +1439,10 @@ def format_message(data):
else:
message = (
f"{data['host']} ({data['ip']})\n"
f"<b>Описание</b>: {data['msg']}\n"
f"<b>Описание</b>: {msg}\n"
f"<b>Критичность</b>: {data['severity']}\n"
f"<b>Проблема устранена!</b>\n"
f"<b>Время устранения проблемы</b>: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n"
f"<b>Время устранения</b>: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(data['date_reception'])))} Мск\n"
)
if 'link' in data:
message += f'<b>URL</b>: <a href="{data['link']}">Ссылка на график</a>'
@ -1760,6 +1808,7 @@ def main():
Thread(target=run_flask, daemon=True).start()
Thread(target=run_polling, daemon=True).start()
# Запуск асинхронных задач
asyncio.run(consume_from_queue())

116
zabbix_manager.py Normal file
View File

@ -0,0 +1,116 @@
import logging
import asyncio
from pyzabbix import ZabbixAPI
from datetime import datetime
from pytz import timezone
import os
class ZabbixTriggerManager:
def __init__(self):
self.zabbix_url = os.getenv('ZABBIX_URL') # URL Zabbix из переменных окружения
self.api_token = os.getenv('ZABBIX_API_TOKEN') # API токен Zabbix из переменных окружения
self.moskva_tz = timezone('Europe/Moscow') # Часовой пояс
self.priority_map = {'4': 'HIGH', '5': 'DISASTER'} # Отображение приоритетов
self.zapi = None # Клиент Zabbix API
def login(self) -> None:
"""
Авторизация в Zabbix API.
"""
try:
self.zapi = ZabbixAPI(self.zabbix_url)
self.zapi.login(api_token=self.api_token)
logging.info("Successfully logged in to Zabbix API.")
except Exception as e:
logging.error(f"Error logging in to Zabbix API: {e}")
raise
def get_problems_for_group(self, group_id: str) -> list:
"""
Получает проблемы для указанной группы хостов.
"""
try:
logging.info(f"Fetching problems for group {group_id}")
problems = self.zapi.problem.get(
output=["eventid", "name", "severity", "clock"],
groupids=group_id,
recent=1, # Только текущие проблемы
# sortfield=["clock"],
# sortorder="DESC"
)
logging.info(f"Found {len(problems)} problems for group {group_id}")
return problems
except Exception as e:
logging.error(f"Error fetching problems for group {group_id}: {e}")
return []
def get_problems_for_region(self, region_id: str) -> list:
"""
Получает проблемы для всех групп, связанных с регионом.
"""
try:
logging.info(f"Fetching host groups for region {region_id}")
host_groups = self.zapi.hostgroup.get(output=["groupid", "name"], search={"name": region_id})
filtered_groups = [
group for group in host_groups
if 'test' not in group['name'].lower() and f'_{region_id}' in group['name']
]
all_problems = []
for group in filtered_groups:
problems = self.get_problems_for_group(group['groupid'])
all_problems.extend(problems)
return all_problems
except Exception as e:
logging.error(f"Error fetching problems for region {region_id}: {e}")
return []
async def send_problems_to_user(self, chat_id: int, problems: list, bot) -> None:
"""
Отправляет список проблем пользователю в Telegram с учётом задержки для обхода лимита API.
"""
if not problems:
logging.info(f"No problems to send to chat_id {chat_id}.")
bot.send_message(chat_id, "Нет активных проблем.")
return
for problem in problems:
try:
# Форматируем время
event_time = datetime.fromtimestamp(int(problem['clock']), tz=self.moskva_tz)
event_time_formatted = event_time.strftime('%Y-%m-%d %H:%M:%S Мск')
# Определяем критичность
severity = self.priority_map.get(problem['severity'], "Неизвестно")
priority_mark = '⚠️' if severity == 'HIGH' else '⛔️' if severity == 'DISASTER' else ''
# Определяем имя хоста
hosts = self._get_problem_hosts(problem)
host = hosts[0]["name"] if hosts else "Неизвестно"
# Генерируем URL для графика
item_ids = [item["itemid"] for item in problem.get("items", [])]
url = f"{self.zabbix_url}/history.php?action=batchgraph&" + \
"&".join([f"itemids[{item_id}]={item_id}" for item_id in item_ids]) + "&graphtype=0"
# Формируем сообщение
description = self.escape_telegram_chars(problem['name'])
message = (
f"{priority_mark} <b>Host</b>: {host}\n"
f"<b>Описание</b>: {description}\n"
f"<b>Критичность</b>: {severity}\n"
f"<b>Время создания</b>: {event_time_formatted}\n"
f'<b>URL</b>: <a href="{url}">Ссылка на график</a>'
)
# Отправляем сообщение
bot.send_message(chat_id, message, parse_mode="HTML")
# Задержка для соблюдения рейтлимита
await asyncio.sleep(0.04) # 40 мс = максимум 25 сообщений в секунду
except Exception as e:
logging.error(f"Error sending problem to chat_id {chat_id}: {e}")