170 lines
8.0 KiB
Python
170 lines
8.0 KiB
Python
import logging
|
||
import re
|
||
from typing import Dict, List, Optional, Tuple, Any
|
||
|
||
from sqlalchemy.exc import IntegrityError
|
||
from sqlalchemy.orm import joinedload
|
||
from sqlalchemy.orm.query import Query
|
||
from sqlalchemy.orm.scoping import scoped_session
|
||
|
||
from models import Users
|
||
|
||
# Настройка логирования
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class UserManager:
|
||
def __init__(self, db: scoped_session) -> None:
|
||
self.db: scoped_session = db
|
||
|
||
def get_users(self, page: int, per_page: int) -> Dict[str, Any]:
|
||
logger.debug(f"Получение пользователей: page={page}, per_page={per_page}")
|
||
|
||
users_query: Query = self.db.query(Users).options(joinedload(Users.subscriptions))
|
||
# noinspection PyUnresolvedReferences
|
||
users_paginated: Any = users_query.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
users_list: List[Dict[str, Any]] = []
|
||
for user in users_paginated.items:
|
||
user_data: Dict[str, Any] = {
|
||
'chat_id': user.chat_id,
|
||
'telegram_id': user.telegram_id,
|
||
'email': user.user_email,
|
||
'subscriptions': [],
|
||
'disaster_only': "Все уведомления",
|
||
'status': "Активен" if not user.is_blocked else "Заблокирован",
|
||
'blocked': user.is_blocked
|
||
}
|
||
|
||
if user.subscriptions:
|
||
for subscription in user.subscriptions:
|
||
if subscription.active:
|
||
user_data['subscriptions'].append(subscription.region_id)
|
||
if subscription.disaster_only:
|
||
user_data['disaster_only'] = "Только критические уведомления"
|
||
|
||
users_list.append(user_data)
|
||
|
||
logger.debug(f"Получено пользователей: {len(users_list)} элементов")
|
||
|
||
return {
|
||
'users': users_list,
|
||
'total_users': users_paginated.total,
|
||
'total_pages': users_paginated.pages,
|
||
'current_page': users_paginated.page,
|
||
'per_page': users_paginated.per_page
|
||
}
|
||
|
||
def get_user(self, chat_id: int) -> Optional[Dict[str, Any]]:
|
||
logger.debug(f"Получение пользователя: chat_id={chat_id}")
|
||
|
||
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
|
||
if user:
|
||
user_data: Dict[str, Any] = {
|
||
'chat_id': user.chat_id,
|
||
'telegram_id': user.telegram_id,
|
||
'email': user.user_email,
|
||
'blocked': user.is_blocked
|
||
}
|
||
logger.debug(f"Пользователь найден: chat_id={chat_id}")
|
||
return user_data
|
||
else:
|
||
logger.warning(f"Пользователь не найден: chat_id={chat_id}")
|
||
return None
|
||
|
||
def toggle_block_user(self, chat_id: int) -> bool:
|
||
logger.debug(f"Переключение блокировки пользователя: chat_id={chat_id}")
|
||
|
||
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
|
||
if user:
|
||
user.is_blocked = not user.is_blocked
|
||
self.db.commit()
|
||
logger.info(f"Пользователь {chat_id} заблокирован")
|
||
return True
|
||
else:
|
||
logger.warning(f"Пользователь не найден: chat_id={chat_id}")
|
||
return False
|
||
|
||
def delete_user(self, chat_id: int) -> bool:
|
||
logger.info(f"Удаление пользователя: chat_id={chat_id}")
|
||
|
||
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
|
||
if user:
|
||
self.db.delete(user)
|
||
self.db.commit()
|
||
logger.info(f"Пользователь удален: chat_id={chat_id}")
|
||
return True
|
||
else:
|
||
logger.warning(f"Пользователь не найден: chat_id={chat_id}")
|
||
return False
|
||
|
||
def add_user(self, user_data: Dict[str, Any]) -> Tuple[Dict[str, str], Optional[int]]:
|
||
logger.info(f"Добавление пользователя: {user_data}")
|
||
try:
|
||
try:
|
||
chat_id = int(user_data['chat_id'])
|
||
except ValueError:
|
||
logger.warning("Chat ID должен быть числом")
|
||
return {'error': 'Chat ID должен быть числом'}, 400
|
||
|
||
if not re.match(r'^@.*$', user_data['telegram_id']):
|
||
logger.warning("Telegram ID должен начинаться с символа @")
|
||
return {'error': 'Telegram ID должен начинаться с символа @'}, 400
|
||
if not re.match(r'.*@rtmis.ru$', user_data['user_email']):
|
||
logger.warning("Email должен содержать домен @rtmis.ru")
|
||
return {'error': 'Email должен содержать домен @rtmis.ru'}, 400
|
||
|
||
# Проверка на существование пользователя
|
||
if Users.query.filter_by(user_email=user_data['user_email']).first():
|
||
logger.warning(f"Пользователь с email {user_data['user_email']} уже существует")
|
||
return {'error': 'Пользователь с таким email уже существует'}, 409
|
||
if Users.query.filter_by(telegram_id=user_data['telegram_id']).first():
|
||
logger.warning(f"Пользователь с Telegram ID {user_data['telegram_id']} уже существует")
|
||
return {'error': 'Пользователь с таким Telegram ID уже существует'}, 409
|
||
if Users.query.filter_by(chat_id=chat_id).first():
|
||
logger.warning(f"Пользователь с Chat ID {chat_id} уже существует")
|
||
return {'error': 'Пользователь с таким Chat ID уже существует'}, 409
|
||
|
||
new_user: Users = Users(
|
||
chat_id=chat_id,
|
||
telegram_id=user_data['telegram_id'],
|
||
user_email=user_data['user_email'],
|
||
is_blocked=user_data.get('is_blocked', False)
|
||
)
|
||
self.db.add(new_user)
|
||
self.db.commit()
|
||
logger.info(f"Пользователь добавлен успешно: {new_user.user_email}")
|
||
return {'message': 'Пользователь добавлен успешно'}, 201
|
||
except IntegrityError as e:
|
||
self.db.rollback()
|
||
logger.error(f"Ошибка уникальности при добавлении пользователя: {e}")
|
||
return {'error': 'Ошибка уникальности данных'}, 409
|
||
except Exception as e:
|
||
self.db.rollback()
|
||
logger.error(f"Ошибка при добавлении пользователя: {type(e).__name__}: {e}")
|
||
return {'error': 'Ошибка при добавлении пользователя'}, 500
|
||
|
||
def search_users(self, telegram_id: Optional[str] = None, email: Optional[str] = None) -> List[Dict[str, Any]]:
|
||
logger.debug(f"Поиск пользователей: telegram_id={telegram_id}, email={email}")
|
||
|
||
query: Query = self.db.query(Users)
|
||
if telegram_id:
|
||
query = query.filter(Users.telegram_id.ilike(f"%{telegram_id}%"))
|
||
if email:
|
||
query = query.filter(Users.user_email.ilike(f"%{email}%"))
|
||
|
||
users: List[Users] = query.all()
|
||
|
||
users_list: List[Dict[str, Any]] = []
|
||
for user in users:
|
||
user_data: Dict[str, Any] = {
|
||
'chat_id': user.chat_id,
|
||
'telegram_id': user.telegram_id,
|
||
'email': user.user_email,
|
||
'blocked': user.is_blocked
|
||
}
|
||
users_list.append(user_data)
|
||
|
||
logger.debug(f"Найдено пользователей: {len(users_list)}")
|
||
return users_list
|