All checks were successful
Build and Push Docker Images / build (push) Successful in 1m28s
- Рефакторинг Telegram бота на модульную структуру для удобства поддержки и расширения - Создан общий RabbitMQ клиент для Flask и Telegram компонентов - Подготовлена базовая архитектура для будущего масштабирования и новых функций Signed-off-by: UdoChudo <stream@udochudo.ru>
236 lines
11 KiB
Python
236 lines
11 KiB
Python
# app/services/user_service.py
|
||
|
||
import logging
|
||
import re
|
||
from typing import Dict, List, Optional, Tuple, Any
|
||
|
||
from sqlalchemy.exc import IntegrityError
|
||
from sqlalchemy.orm import joinedload
|
||
|
||
from app import db
|
||
from app.extensions.bot_send import bot_send_message
|
||
from app.models import Users # Предполагаем, что app.models/__init__.py экспортирует Users
|
||
from app.extensions.audit_logger import AuditLogger
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
auditlog = AuditLogger(db.session)
|
||
|
||
def get_users(page: int, per_page: int) -> Dict[str, Any]:
|
||
logger.debug(f"Получение пользователей: page={page}, per_page={per_page}")
|
||
users_query = Users.query.options(joinedload(Users.subscriptions))
|
||
users_paginated = users_query.paginate(page=page, per_page=per_page, error_out=False)
|
||
|
||
users_list: List[Dict[str, Any]] = []
|
||
for user in users_paginated.items:
|
||
user_data: Dict[str, Any] = {
|
||
'chat_id': user.chat_id,
|
||
'telegram_id': user.telegram_id,
|
||
'email': user.user_email,
|
||
'subscriptions': [],
|
||
'disaster_only': "Все уведомления",
|
||
'status': "Активен" if not user.is_blocked else "Заблокирован",
|
||
'blocked': user.is_blocked
|
||
}
|
||
|
||
if user.subscriptions:
|
||
for subscription in user.subscriptions:
|
||
if subscription.active and not subscription.skip:
|
||
user_data['subscriptions'].append(subscription.region_id)
|
||
if subscription.disaster_only:
|
||
user_data['disaster_only'] = "Только критические уведомления"
|
||
|
||
users_list.append(user_data)
|
||
logger.debug(f"Получено пользователей: {len(users_list)} элементов")
|
||
return {
|
||
'users': users_list,
|
||
'total_users': users_paginated.total,
|
||
'total_pages': users_paginated.pages,
|
||
'current_page': users_paginated.page,
|
||
'per_page': users_paginated.per_page
|
||
}
|
||
|
||
def get_user(chat_id: int) -> Optional[Dict[str, Any]]:
|
||
logger.debug(f"Получение пользователя: chat_id={chat_id}")
|
||
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
|
||
if user:
|
||
user_data: Dict[str, Any] = {
|
||
'chat_id': user.chat_id,
|
||
'telegram_id': user.telegram_id,
|
||
'email': user.user_email,
|
||
'blocked': user.is_blocked
|
||
}
|
||
logger.debug(f"Пользователь найден: chat_id={chat_id}")
|
||
return user_data
|
||
else:
|
||
logger.warning(f"Пользователь не найден: chat_id={chat_id}")
|
||
return None
|
||
|
||
def toggle_block_user(chat_id: int, actor_user: Any) -> Tuple[Dict[str, Any], int]:
|
||
logger.debug(f"Переключение блокировки пользователя: chat_id={chat_id}")
|
||
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
|
||
if not user:
|
||
error_msg = "Пользователь не найден"
|
||
auditlog.users(action_type="toggle_block", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
|
||
logger.warning(f"{error_msg}: chat_id={chat_id}")
|
||
return {'status': 'error', 'message': error_msg}, 404
|
||
try:
|
||
user.is_blocked = not user.is_blocked
|
||
db.session.commit()
|
||
status_text = "заблокирован" if user.is_blocked else "разблокирован"
|
||
logger.info(f"Пользователь {chat_id} {status_text}")
|
||
|
||
action_type = "block" if user.is_blocked else "unblock"
|
||
auditlog.users(
|
||
action_type=action_type,
|
||
actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id,
|
||
affected_chat_id=chat_id,
|
||
email=user.user_email,
|
||
telegram_id=user.telegram_id,
|
||
)
|
||
return {'status': 'updated', 'new_status': user.is_blocked}, 200
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
error_msg = str(e)
|
||
logger.error(f"Ошибка при переключении блокировки пользователя {chat_id}: {error_msg}")
|
||
auditlog.users(action_type="toggle_block", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
|
||
return {'status': 'error', 'message': error_msg}, 500
|
||
|
||
|
||
def delete_user(chat_id: int, actor_user: Any) -> Tuple[Dict[str, Any], int]:
|
||
logger.info(f"Удаление пользователя: chat_id={chat_id}")
|
||
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
|
||
if not user:
|
||
error_msg = "Пользователь не найден"
|
||
auditlog.users(action_type="delete", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
|
||
logger.warning(f"{error_msg}: chat_id={chat_id}")
|
||
return {'status': 'error', 'message': error_msg}, 404
|
||
try:
|
||
db.session.delete(user)
|
||
db.session.commit()
|
||
logger.info(f"Пользователь удален: chat_id={chat_id}")
|
||
auditlog.users(
|
||
action_type="delete",
|
||
actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id,
|
||
affected_chat_id=chat_id,
|
||
email=user.user_email,
|
||
telegram_id=user.telegram_id,
|
||
)
|
||
return {'status': 'deleted', 'message': 'Пользователь удален'}, 200
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
error_msg = str(e)
|
||
logger.error(f"Ошибка при удалении пользователя {chat_id}: {error_msg}")
|
||
auditlog.users(action_type="delete", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
|
||
return {'status': 'error', 'message': error_msg}, 500
|
||
|
||
def add_user(user_data: Dict[str, Any], actor_user: Any) -> Tuple[Dict[str, str], int]:
|
||
logger.info(f"Добавление пользователя: {user_data}")
|
||
chat_id = None
|
||
telegram_id = user_data.get('telegram_id')
|
||
user_email = user_data.get('user_email')
|
||
|
||
try:
|
||
try:
|
||
chat_id = int(user_data.get('chat_id'))
|
||
except (ValueError, TypeError):
|
||
error_msg = 'Chat ID должен быть числом'
|
||
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
|
||
telegram_id=telegram_id, email=user_email, error=error_msg)
|
||
logger.warning(error_msg)
|
||
return {'error': error_msg}, 400
|
||
|
||
if not telegram_id or not re.match(r'^@.*$', telegram_id):
|
||
error_msg = "Telegram ID должен начинаться с символа @"
|
||
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
|
||
telegram_id=telegram_id, email=user_email, error=error_msg)
|
||
logger.warning(error_msg)
|
||
return {'error': error_msg}, 400
|
||
if not user_email or not re.match(r'.*@rtmis.ru$', user_email):
|
||
error_msg = "Email должен содержать домен @rtmis.ru"
|
||
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
|
||
telegram_id=telegram_id, email=user_email, error=error_msg)
|
||
logger.warning(error_msg)
|
||
return {'error': error_msg}, 400
|
||
|
||
existing_user = Users.query.filter(
|
||
(Users.user_email == user_email) |
|
||
(Users.telegram_id == telegram_id) |
|
||
(Users.chat_id == chat_id)
|
||
).first()
|
||
|
||
if existing_user:
|
||
error_msg = 'Пользователь с таким Chat ID, Telegram ID или Email уже существует.'
|
||
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
|
||
telegram_id=telegram_id, email=user_email, error=error_msg)
|
||
logger.warning(error_msg)
|
||
return {'error': error_msg}, 409
|
||
|
||
new_user: Users = Users(
|
||
chat_id=chat_id,
|
||
telegram_id=telegram_id,
|
||
user_email=user_email,
|
||
is_blocked=user_data.get('is_blocked', False)
|
||
)
|
||
db.session.add(new_user)
|
||
db.session.commit()
|
||
logger.info(f"Пользователь добавлен успешно: {new_user.user_email}")
|
||
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
|
||
telegram_id=telegram_id, email=user_email)
|
||
try:
|
||
bot_send_message(chat_id,"Регистрация пройдена успешно. нажмите продолжить что бы начать пользоваться ботом!")
|
||
except Exception as e:
|
||
logger.info(f"Ошибка при отправке сообщения {chat_id}: {e}")
|
||
return {'message': 'Пользователь добавлен успешно'}, 201
|
||
except IntegrityError as e:
|
||
db.session.rollback()
|
||
error_msg = 'Ошибка уникальности данных'
|
||
logger.error(f"Ошибка уникальности при добавлении пользователя: {e}")
|
||
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
|
||
telegram_id=telegram_id, email=user_email, error=error_msg)
|
||
return {'error': error_msg}, 409
|
||
except Exception as e:
|
||
db.session.rollback()
|
||
error_msg = f'Ошибка при добавлении пользователя: {type(e).__name__}: {e}'
|
||
logger.error(error_msg)
|
||
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
|
||
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
|
||
telegram_id=telegram_id, email=user_email, error=error_msg)
|
||
return {'error': 'Ошибка при добавлении пользователя'}, 500
|
||
|
||
def search_users(telegram_id: Optional[str] = None, email: Optional[str] = None) -> List[Dict[str, Any]]:
|
||
logger.debug(f"Поиск пользователей: telegram_id={telegram_id}, email={email}")
|
||
|
||
query = db.session.query(Users)
|
||
if telegram_id:
|
||
query = query.filter(Users.telegram_id.ilike(f"%{telegram_id}%"))
|
||
if email:
|
||
query = query.filter(Users.user_email.ilike(f"%{email}%"))
|
||
|
||
users: List[Users] = query.all()
|
||
|
||
users_list: List[Dict[str, Any]] = []
|
||
for user in users:
|
||
# Используем названия полей модели напрямую
|
||
user_data: Dict[str, Any] = {
|
||
'chat_id': user.chat_id,
|
||
'telegram_id': user.telegram_id,
|
||
'email': user.user_email,
|
||
'blocked': user.is_blocked
|
||
}
|
||
users_list.append(user_data)
|
||
|
||
logger.debug(f"Найдено пользователей: {len(users_list)}")
|
||
return users_list |