UdoChudo 755280d7de
All checks were successful
Build and Push Docker Image / build (push) Successful in 2m15s
Сгенерил базовую модульную структуру спасибо Claude AI
2025-06-25 00:28:00 +05:00

110 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from typing import Optional, Dict, Any
from pyxui import XUI
from pyxui.errors import BadLogin
from bot.config import (
XUI_FULL_ADDRESS,
XUI_PANEL_NAME,
XUI_USERNAME,
XUI_PASSWORD
)
from bot.utils.logging import logger
class XUIService:
"""Сервис для работы с XUI панелью."""
def __init__(self):
self.xui = XUI(
full_address=XUI_FULL_ADDRESS,
panel=XUI_PANEL_NAME,
https=True
)
def login(self) -> None:
"""Авторизация в XUI панели."""
if self.xui.session_string:
logger.debug("Уже залогинен в XUI.")
return
try:
self.xui.login(XUI_USERNAME, XUI_PASSWORD)
logger.info("Успешный вход в XUI.")
except BadLogin:
logger.error("Неверный логин или пароль XUI.")
raise
except Exception as e:
logger.error(f"Ошибка при входе в XUI: {e}")
raise
def get_client(self, inbound_id: int, email: str) -> Optional[Dict[str, Any]]:
"""Получение информации о клиенте."""
try:
self.login()
return self.xui.get_client(inbound_id=inbound_id, email=email)
except Exception as e:
logger.error(f"Ошибка получения клиента {email}: {e}")
return None
def add_vless_client(
self,
inbound_id: int,
email: str,
uuid: str,
telegram_id: str,
enable: bool = True,
flow: str = "xtls-rprx-vision",
limit_ip: int = 0,
total_gb: int = 0,
expire_time: int = 0
) -> bool:
"""Добавление VLESS клиента."""
try:
self.login()
self.xui.add_client(
inbound_id=inbound_id,
email=email,
uuid=uuid,
enable=enable,
flow=flow,
limit_ip=limit_ip,
total_gb=total_gb,
expire_time=expire_time,
telegram_id=telegram_id,
subscription_id=telegram_id
)
logger.info(f"VLESS клиент создан: {email}")
return True
except Exception as e:
logger.error(f"Ошибка создания VLESS клиента {email}: {e}")
return False
def get_inbound(self, inbound_id: int) -> Optional[Dict[str, Any]]:
"""Получение информации об inbound."""
try:
self.login()
return self.xui.get_inbound(inbound_id)
except Exception as e:
logger.error(f"Ошибка получения inbound {inbound_id}: {e}")
return None
def get_cookies(self) -> Dict[str, str]:
"""Получение cookies для API запросов."""
cookies = {}
if hasattr(self.xui, 'session') and self.xui.session and hasattr(self.xui.session, 'cookies'):
for cookie in self.xui.session.cookies:
if cookie.name == 'x-ui':
cookies['x-ui'] = cookie.value
break
if self.xui.session_string:
cookies['x-ui'] = self.xui.session_string
return cookies
def reset_session(self) -> None:
"""Сброс сессии для повторной авторизации."""
self.xui.session_string = None