from typing import Optional from flask import request from flask_login import current_user from app.models import AuditLog class AuditLogger: def __init__(self, session): self.session = session def _save_log(self, ldap_user_id, username, action, details, ipaddress=None): username = username or "Anonymous" ldap_user_id = ldap_user_id or "anonymous" ipaddress = ipaddress or request.headers.get('X-Forwarded-For', request.remote_addr) log_entry = AuditLog( ldap_user_id=ldap_user_id, username=username, action=action, details=details, ipaddress=ipaddress ) self.session.add(log_entry) self.session.commit() def auth(self, username_attempted, success: bool, ldap_user_id=None, display_name=None, error=None): action = "Авторизация" if success else "Ошибка авторизации" details = f"{'Успешный вход' if success else 'Неудачная попытка входа'}: {display_name or username_attempted}" if error: details += f". Ошибка: {error}" self._save_log( ldap_user_id=ldap_user_id or username_attempted, username=display_name if success else "Anonymous", action=action, details=details ) def users(self, action_type: str, actor_display_name: Optional[str], ldap_user_id: str, affected_chat_id: Optional[int] = None, email: Optional[str] = None, telegram_id: Optional[str] = None, error: Optional[str] = None): action_map = { "add": "Добавление пользователя", "delete": "Удаление пользователя", "block": "Блокировка пользователя", "unblock": "Разблокировка пользователя" } if action_type not in action_map: raise ValueError(f"Недопустимое действие логирования: {action_type}") details = self._compose_details( telegram_id=telegram_id, affected_chat_id=affected_chat_id, email=email, error=error, fallback="Данные пользователя отсутствуют" if not current_user.is_authenticated else None ) self._save_log( ldap_user_id=ldap_user_id, username=actor_display_name, action=action_map[action_type], details=details ) def systems(self, action_type: str, actor_display_name: Optional[str], ldap_user_id: str, system_id: Optional[str] = None, name: Optional[str] = None, error: Optional[str] = None): action_map = { "add": "Добавление системы", "update": "Изменение имени системы", "delete": "Удаление системы" } if action_type not in action_map: raise ValueError(f"Недопустимое действие логирования: {action_type}") details = self._compose_details( system_id=system_id, name=name, error=error ) self._save_log( ldap_user_id=ldap_user_id, username=actor_display_name, action=action_map[action_type], details=details ) def regions(self, action_type: str, actor_display_name: Optional[str], ldap_user_id: str, region_id: Optional[str] = None, name: Optional[str] = None, new_name: Optional[str] = None, old_name: Optional[str] = None, active: Optional[bool] = None, old_active: Optional[bool] = None, error: Optional[str] = None, ): action_map = { "add": "Добавление региона", "rename": "Изменение имени региона", "toggle": "Изменение статуса региона", "delete": "Удаление региона" } if action_type not in action_map: raise ValueError(f"Недопустимое действие логирования: {action_type}") if action_type == "rename": if old_name is not None and new_name is not None: details = f"Region id: {region_id}; Rename: {old_name} → {new_name}" else: details = self._compose_details( region_id=region_id, new_name=new_name, old_name=old_name, error=error ) elif action_type == "toggle": if old_active is not None and active is not None: old_status_str = "Активен" if old_active else "Отключён" new_status_str = "Активен" if active else "Отключён" details = f"Region id: {region_id}; Status: {old_status_str} → {new_status_str}" else: status_str = "Активен" if active else "Отключён" if active is not None else None details = self._compose_details( region_id=region_id, status=status_str, error=error ) else: status_str = "Активен" if active else "Отключён" if active is not None else None details = self._compose_details( region_id=region_id, name=name, status=status_str, error=error ) self._save_log( ldap_user_id=ldap_user_id, username=actor_display_name, action=action_map[action_type], details=details ) def get_web_action_logs(self, page, per_page, ldap_user_id_filter=None, action_filter=None): query = AuditLog.query.order_by(AuditLog.timestamp.desc()) if ldap_user_id_filter: query = query.filter_by(ldap_user_id=ldap_user_id_filter) if action_filter: query = query.filter(AuditLog.action.like(f'%{action_filter}%')) pagination = query.paginate(page=page, per_page=per_page) return { 'logs': [ { 'id': log.id, 'ldap_user_id': log.ldap_user_id, 'username': log.username, 'timestamp': log.timestamp.isoformat(), 'action': log.action, 'details': log.details } for log in pagination.items ], 'total': pagination.total, 'pages': pagination.pages, 'current_page': pagination.page, 'per_page': pagination.per_page } @staticmethod def _compose_details(**kwargs) -> str: parts = [] for key, value in kwargs.items(): if value is None: continue if key == "fallback": parts.append(value) else: key_display = key.replace("_", " ").capitalize() parts.append(f"{key_display}: {value}") return "; ".join(parts)