Telezab/app/extensions/audit_logger.py
UdoChudo 52e31864b3 feat: Develop web interface
- Implemented the initial version of the web interface.
refactor: Begin Telegram bot refactoring
- Started restructuring the bot’s code for better maintainability.
chore: Migrate to Flask project structure
- Reorganized the application to follow Flask's project structure.
cleanup: Extensive code cleanup
- Removed redundant code and improved readability.

Signed-off-by: UdoChudo <stream@udochudo.ru>
2025-06-10 14:39:11 +05:00

191 lines
7.3 KiB
Python

from typing import Optional
from flask import request
from flask_login import current_user
from app.models import AuditLog
class AuditLogger:
def __init__(self, session):
self.session = session
def _save_log(self, ldap_user_id, username, action, details, ipaddress=None):
username = username or "Anonymous"
ldap_user_id = ldap_user_id or "anonymous"
ipaddress = ipaddress or request.headers.get('X-Forwarded-For', request.remote_addr)
log_entry = AuditLog(
ldap_user_id=ldap_user_id,
username=username,
action=action,
details=details,
ipaddress=ipaddress
)
self.session.add(log_entry)
self.session.commit()
def auth(self, username_attempted, success: bool, ldap_user_id=None, display_name=None, error=None):
action = "Авторизация" if success else "Ошибка авторизации"
details = f"{'Успешный вход' if success else 'Неудачная попытка входа'}: {display_name or username_attempted}"
if error:
details += f". Ошибка: {error}"
self._save_log(
ldap_user_id=ldap_user_id or username_attempted,
username=display_name if success else "Anonymous",
action=action,
details=details
)
def users(self, action_type: str, actor_display_name: Optional[str], ldap_user_id: str,
affected_chat_id: Optional[int] = None, email: Optional[str] = None,
telegram_id: Optional[str] = None, error: Optional[str] = None):
action_map = {
"add": "Добавление пользователя",
"delete": "Удаление пользователя",
"block": "Блокировка пользователя",
"unblock": "Разблокировка пользователя"
}
if action_type not in action_map:
raise ValueError(f"Недопустимое действие логирования: {action_type}")
details = self._compose_details(
telegram_id=telegram_id,
affected_chat_id=affected_chat_id,
email=email,
error=error,
fallback="Данные пользователя отсутствуют" if not current_user.is_authenticated else None
)
self._save_log(
ldap_user_id=ldap_user_id,
username=actor_display_name,
action=action_map[action_type],
details=details
)
def systems(self, action_type: str, actor_display_name: Optional[str], ldap_user_id: str,
system_id: Optional[str] = None, name: Optional[str] = None, error: Optional[str] = None):
action_map = {
"add": "Добавление системы",
"update": "Изменение имени системы",
"delete": "Удаление системы"
}
if action_type not in action_map:
raise ValueError(f"Недопустимое действие логирования: {action_type}")
details = self._compose_details(
system_id=system_id,
name=name,
error=error
)
self._save_log(
ldap_user_id=ldap_user_id,
username=actor_display_name,
action=action_map[action_type],
details=details
)
def regions(self,
action_type: str,
actor_display_name: Optional[str],
ldap_user_id: str,
region_id: Optional[str] = None,
name: Optional[str] = None,
new_name: Optional[str] = None,
old_name: Optional[str] = None,
active: Optional[bool] = None,
old_active: Optional[bool] = None,
error: Optional[str] = None,
):
action_map = {
"add": "Добавление региона",
"rename": "Изменение имени региона",
"toggle": "Изменение статуса региона",
"delete": "Удаление региона"
}
if action_type not in action_map:
raise ValueError(f"Недопустимое действие логирования: {action_type}")
if action_type == "rename":
if old_name is not None and new_name is not None:
details = f"Region id: {region_id}; Rename: {old_name}{new_name}"
else:
details = self._compose_details(
region_id=region_id,
new_name=new_name,
old_name=old_name,
error=error
)
elif action_type == "toggle":
if old_active is not None and active is not None:
old_status_str = "Активен" if old_active else "Отключён"
new_status_str = "Активен" if active else "Отключён"
details = f"Region id: {region_id}; Status: {old_status_str}{new_status_str}"
else:
status_str = "Активен" if active else "Отключён" if active is not None else None
details = self._compose_details(
region_id=region_id,
status=status_str,
error=error
)
else:
status_str = "Активен" if active else "Отключён" if active is not None else None
details = self._compose_details(
region_id=region_id,
name=name,
status=status_str,
error=error
)
self._save_log(
ldap_user_id=ldap_user_id,
username=actor_display_name,
action=action_map[action_type],
details=details
)
def get_web_action_logs(self, page, per_page, ldap_user_id_filter=None, action_filter=None):
query = AuditLog.query.order_by(AuditLog.timestamp.desc())
if ldap_user_id_filter:
query = query.filter_by(ldap_user_id=ldap_user_id_filter)
if action_filter:
query = query.filter(AuditLog.action.like(f'%{action_filter}%'))
pagination = query.paginate(page=page, per_page=per_page)
return {
'logs': [
{
'id': log.id,
'ldap_user_id': log.ldap_user_id,
'username': log.username,
'timestamp': log.timestamp.isoformat(),
'action': log.action,
'details': log.details
} for log in pagination.items
],
'total': pagination.total,
'pages': pagination.pages,
'current_page': pagination.page,
'per_page': pagination.per_page
}
@staticmethod
def _compose_details(**kwargs) -> str:
parts = []
for key, value in kwargs.items():
if value is None:
continue
if key == "fallback":
parts.append(value)
else:
key_display = key.replace("_", " ").capitalize()
parts.append(f"{key_display}: {value}")
return "; ".join(parts)