import logging
import asyncio
from pyzabbix import ZabbixAPI
from datetime import datetime
from pytz import timezone
import os
class ZabbixTriggerManager:
def __init__(self):
self.zabbix_url = os.getenv('ZABBIX_URL') # URL Zabbix из переменных окружения
self.api_token = os.getenv('ZABBIX_API_TOKEN') # API токен Zabbix из переменных окружения
self.moskva_tz = timezone('Europe/Moscow') # Часовой пояс
self.priority_map = {'4': 'HIGH', '5': 'DISASTER'} # Отображение приоритетов
self.zapi = None # Клиент Zabbix API
def login(self) -> None:
"""
Авторизация в Zabbix API.
"""
try:
self.zapi = ZabbixAPI(self.zabbix_url)
self.zapi.login(api_token=self.api_token)
logging.info("Successfully logged in to Zabbix API.")
except Exception as e:
logging.error(f"Error logging in to Zabbix API: {e}")
raise
def get_problems_for_group(self, group_id: str) -> list:
"""
Получает проблемы для указанной группы хостов.
"""
try:
logging.info(f"Fetching problems for group {group_id}")
problems = self.zapi.problem.get(
output=["eventid", "name", "severity", "clock"],
groupids=group_id,
recent=1, # Только текущие проблемы
# sortfield=["clock"],
# sortorder="DESC"
)
logging.info(f"Found {len(problems)} problems for group {group_id}")
return problems
except Exception as e:
logging.error(f"Error fetching problems for group {group_id}: {e}")
return []
def get_problems_for_region(self, region_id: str) -> list:
"""
Получает проблемы для всех групп, связанных с регионом.
"""
try:
logging.info(f"Fetching host groups for region {region_id}")
host_groups = self.zapi.hostgroup.get(output=["groupid", "name"], search={"name": region_id})
filtered_groups = [
group for group in host_groups
if 'test' not in group['name'].lower() and f'_{region_id}' in group['name']
]
all_problems = []
for group in filtered_groups:
problems = self.get_problems_for_group(group['groupid'])
all_problems.extend(problems)
return all_problems
except Exception as e:
logging.error(f"Error fetching problems for region {region_id}: {e}")
return []
async def send_problems_to_user(self, chat_id: int, problems: list, bot) -> None:
"""
Отправляет список проблем пользователю в Telegram с учётом задержки для обхода лимита API.
"""
if not problems:
logging.info(f"No problems to send to chat_id {chat_id}.")
bot.send_message(chat_id, "Нет активных проблем.")
return
for problem in problems:
try:
# Форматируем время
event_time = datetime.fromtimestamp(int(problem['clock']), tz=self.moskva_tz)
event_time_formatted = event_time.strftime('%Y-%m-%d %H:%M:%S Мск')
# Определяем критичность
severity = self.priority_map.get(problem['severity'], "Неизвестно")
priority_mark = '⚠️' if severity == 'HIGH' else '⛔️' if severity == 'DISASTER' else ''
# Определяем имя хоста
hosts = self._get_problem_hosts(problem)
host = hosts[0]["name"] if hosts else "Неизвестно"
# Генерируем URL для графика
item_ids = [item["itemid"] for item in problem.get("items", [])]
url = f"{self.zabbix_url}/history.php?action=batchgraph&" + \
"&".join([f"itemids[{item_id}]={item_id}" for item_id in item_ids]) + "&graphtype=0"
# Формируем сообщение
description = self.escape_telegram_chars(problem['name'])
message = (
f"{priority_mark} Host: {host}\n"
f"Описание: {description}\n"
f"Критичность: {severity}\n"
f"Время создания: {event_time_formatted}\n"
f'URL: Ссылка на график'
)
# Отправляем сообщение
bot.send_message(chat_id, message, parse_mode="HTML")
# Задержка для соблюдения рейтлимита
await asyncio.sleep(0.04) # 40 мс = максимум 25 сообщений в секунду
except Exception as e:
logging.error(f"Error sending problem to chat_id {chat_id}: {e}")