Telezab/app/services/users_service.py
UdoChudo ccb47d527f
All checks were successful
Build and Push Docker Images / build (push) Successful in 1m28s
refactor: modularize Telegram bot and add RabbitMQ client foundation
- Рефакторинг Telegram бота на модульную структуру для удобства поддержки и расширения
- Создан общий RabbitMQ клиент для Flask и Telegram компонентов
- Подготовлена базовая архитектура для будущего масштабирования и новых функций

Signed-off-by: UdoChudo <stream@udochudo.ru>
2025-06-16 09:08:46 +05:00

236 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# app/services/user_service.py
import logging
import re
from typing import Dict, List, Optional, Tuple, Any
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import joinedload
from app import db
from app.extensions.bot_send import bot_send_message
from app.models import Users # Предполагаем, что app.models/__init__.py экспортирует Users
from app.extensions.audit_logger import AuditLogger
logger = logging.getLogger(__name__)
auditlog = AuditLogger(db.session)
def get_users(page: int, per_page: int) -> Dict[str, Any]:
logger.debug(f"Получение пользователей: page={page}, per_page={per_page}")
users_query = Users.query.options(joinedload(Users.subscriptions))
users_paginated = users_query.paginate(page=page, per_page=per_page, error_out=False)
users_list: List[Dict[str, Any]] = []
for user in users_paginated.items:
user_data: Dict[str, Any] = {
'chat_id': user.chat_id,
'telegram_id': user.telegram_id,
'email': user.user_email,
'subscriptions': [],
'disaster_only': "Все уведомления",
'status': "Активен" if not user.is_blocked else "Заблокирован",
'blocked': user.is_blocked
}
if user.subscriptions:
for subscription in user.subscriptions:
if subscription.active and not subscription.skip:
user_data['subscriptions'].append(subscription.region_id)
if subscription.disaster_only:
user_data['disaster_only'] = "Только критические уведомления"
users_list.append(user_data)
logger.debug(f"Получено пользователей: {len(users_list)} элементов")
return {
'users': users_list,
'total_users': users_paginated.total,
'total_pages': users_paginated.pages,
'current_page': users_paginated.page,
'per_page': users_paginated.per_page
}
def get_user(chat_id: int) -> Optional[Dict[str, Any]]:
logger.debug(f"Получение пользователя: chat_id={chat_id}")
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
if user:
user_data: Dict[str, Any] = {
'chat_id': user.chat_id,
'telegram_id': user.telegram_id,
'email': user.user_email,
'blocked': user.is_blocked
}
logger.debug(f"Пользователь найден: chat_id={chat_id}")
return user_data
else:
logger.warning(f"Пользователь не найден: chat_id={chat_id}")
return None
def toggle_block_user(chat_id: int, actor_user: Any) -> Tuple[Dict[str, Any], int]:
logger.debug(f"Переключение блокировки пользователя: chat_id={chat_id}")
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
if not user:
error_msg = "Пользователь не найден"
auditlog.users(action_type="toggle_block", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
logger.warning(f"{error_msg}: chat_id={chat_id}")
return {'status': 'error', 'message': error_msg}, 404
try:
user.is_blocked = not user.is_blocked
db.session.commit()
status_text = "заблокирован" if user.is_blocked else "разблокирован"
logger.info(f"Пользователь {chat_id} {status_text}")
action_type = "block" if user.is_blocked else "unblock"
auditlog.users(
action_type=action_type,
actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id,
affected_chat_id=chat_id,
email=user.user_email,
telegram_id=user.telegram_id,
)
return {'status': 'updated', 'new_status': user.is_blocked}, 200
except Exception as e:
db.session.rollback()
error_msg = str(e)
logger.error(f"Ошибка при переключении блокировки пользователя {chat_id}: {error_msg}")
auditlog.users(action_type="toggle_block", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
return {'status': 'error', 'message': error_msg}, 500
def delete_user(chat_id: int, actor_user: Any) -> Tuple[Dict[str, Any], int]:
logger.info(f"Удаление пользователя: chat_id={chat_id}")
user: Optional[Users] = Users.query.filter_by(chat_id=chat_id).first()
if not user:
error_msg = "Пользователь не найден"
auditlog.users(action_type="delete", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
logger.warning(f"{error_msg}: chat_id={chat_id}")
return {'status': 'error', 'message': error_msg}, 404
try:
db.session.delete(user)
db.session.commit()
logger.info(f"Пользователь удален: chat_id={chat_id}")
auditlog.users(
action_type="delete",
actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id,
affected_chat_id=chat_id,
email=user.user_email,
telegram_id=user.telegram_id,
)
return {'status': 'deleted', 'message': 'Пользователь удален'}, 200
except Exception as e:
db.session.rollback()
error_msg = str(e)
logger.error(f"Ошибка при удалении пользователя {chat_id}: {error_msg}")
auditlog.users(action_type="delete", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id, error=error_msg)
return {'status': 'error', 'message': error_msg}, 500
def add_user(user_data: Dict[str, Any], actor_user: Any) -> Tuple[Dict[str, str], int]:
logger.info(f"Добавление пользователя: {user_data}")
chat_id = None
telegram_id = user_data.get('telegram_id')
user_email = user_data.get('user_email')
try:
try:
chat_id = int(user_data.get('chat_id'))
except (ValueError, TypeError):
error_msg = 'Chat ID должен быть числом'
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
telegram_id=telegram_id, email=user_email, error=error_msg)
logger.warning(error_msg)
return {'error': error_msg}, 400
if not telegram_id or not re.match(r'^@.*$', telegram_id):
error_msg = "Telegram ID должен начинаться с символа @"
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
telegram_id=telegram_id, email=user_email, error=error_msg)
logger.warning(error_msg)
return {'error': error_msg}, 400
if not user_email or not re.match(r'.*@rtmis.ru$', user_email):
error_msg = "Email должен содержать домен @rtmis.ru"
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
telegram_id=telegram_id, email=user_email, error=error_msg)
logger.warning(error_msg)
return {'error': error_msg}, 400
existing_user = Users.query.filter(
(Users.user_email == user_email) |
(Users.telegram_id == telegram_id) |
(Users.chat_id == chat_id)
).first()
if existing_user:
error_msg = 'Пользователь с таким Chat ID, Telegram ID или Email уже существует.'
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
telegram_id=telegram_id, email=user_email, error=error_msg)
logger.warning(error_msg)
return {'error': error_msg}, 409
new_user: Users = Users(
chat_id=chat_id,
telegram_id=telegram_id,
user_email=user_email,
is_blocked=user_data.get('is_blocked', False)
)
db.session.add(new_user)
db.session.commit()
logger.info(f"Пользователь добавлен успешно: {new_user.user_email}")
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
telegram_id=telegram_id, email=user_email)
try:
bot_send_message(chat_id,"Регистрация пройдена успешно. нажмите продолжить что бы начать пользоваться ботом!")
except Exception as e:
logger.info(f"Ошибка при отправке сообщения {chat_id}: {e}")
return {'message': 'Пользователь добавлен успешно'}, 201
except IntegrityError as e:
db.session.rollback()
error_msg = 'Ошибка уникальности данных'
logger.error(f"Ошибка уникальности при добавлении пользователя: {e}")
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
telegram_id=telegram_id, email=user_email, error=error_msg)
return {'error': error_msg}, 409
except Exception as e:
db.session.rollback()
error_msg = f'Ошибка при добавлении пользователя: {type(e).__name__}: {e}'
logger.error(error_msg)
auditlog.users(action_type="add", actor_display_name=actor_user.display_name,
ldap_user_id=actor_user.id, affected_chat_id=chat_id,
telegram_id=telegram_id, email=user_email, error=error_msg)
return {'error': 'Ошибка при добавлении пользователя'}, 500
def search_users(telegram_id: Optional[str] = None, email: Optional[str] = None) -> List[Dict[str, Any]]:
logger.debug(f"Поиск пользователей: telegram_id={telegram_id}, email={email}")
query = db.session.query(Users)
if telegram_id:
query = query.filter(Users.telegram_id.ilike(f"%{telegram_id}%"))
if email:
query = query.filter(Users.user_email.ilike(f"%{email}%"))
users: List[Users] = query.all()
users_list: List[Dict[str, Any]] = []
for user in users:
# Используем названия полей модели напрямую
user_data: Dict[str, Any] = {
'chat_id': user.chat_id,
'telegram_id': user.telegram_id,
'email': user.user_email,
'blocked': user.is_blocked
}
users_list.append(user_data)
logger.debug(f"Найдено пользователей: {len(users_list)}")
return users_list