129 lines
5.5 KiB
Python
129 lines
5.5 KiB
Python
import logging
|
|
from flask import Blueprint, render_template, request, redirect, url_for, flash, session, current_app
|
|
from flask_ldap3_login import LDAP3LoginManager, AuthenticationResponseStatus
|
|
from flask_login import LoginManager, login_user, UserMixin, logout_user, current_user
|
|
from datetime import timedelta
|
|
|
|
import config
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
|
|
bp_auth = Blueprint('auth', __name__, url_prefix='/telezab/')
|
|
|
|
login_manager = LoginManager()
|
|
logging.getLogger('flask-login').setLevel(logging.DEBUG)
|
|
logging.getLogger('flask_ldap3_login').setLevel(logging.DEBUG)
|
|
logging.getLogger('ldap3').setLevel(logging.DEBUG)
|
|
|
|
class User(UserMixin):
|
|
def __init__(self, user_id, user_name=None, user_surname=None, user_middle_name=None,display_name=None, email=None):
|
|
self.id = str(user_id)
|
|
self.user_name = user_name
|
|
self.user_surname = user_surname
|
|
self.user_middle_name = user_middle_name
|
|
self.display_name = display_name
|
|
self.email = email
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
logging.debug(f"load_user called for user_id: {user_id}")
|
|
display_name = session.get('display_name') # Получаем display_name из сессии
|
|
return User(user_id, display_name=display_name)
|
|
|
|
@bp_auth.record_once
|
|
def on_load(state):
|
|
login_manager.init_app(state.app)
|
|
login_manager.login_view = 'auth.login'
|
|
init_ldap(state.app)
|
|
|
|
|
|
def init_ldap(app):
|
|
app.config['LDAP_HOST'] = config.LDAP_HOST
|
|
app.config['LDAP_PORT'] = config.LDAP_PORT
|
|
app.config['LDAP_USE_SSL'] = config.LDAP_USE_SSL
|
|
app.config['LDAP_BASE_DN'] = config.LDAP_BASE_DN
|
|
app.config['LDAP_BIND_DIRECT_CREDENTIALS'] = False
|
|
app.config['LDAP_BIND_USER_DN'] = config.LDAP_BIND_USER_DN
|
|
app.config['LDAP_BIND_USER_PASSWORD'] = config.LDAP_USER_PASSWORD
|
|
app.config['LDAP_USER_DN'] = config.LDAP_USER_DN
|
|
app.config['LDAP_USER_PASSWORD'] = config.LDAP_USER_PASSWORD
|
|
app.config['LDAP_USER_OBJECT_FILTER'] = config.LDAP_USER_OBJECT_FILTER
|
|
app.config['LDAP_USER_LOGIN_ATTR'] = config.LDAP_USER_LOGIN_ATTR
|
|
app.config['LDAP_USER_SEARCH_SCOPE'] = config.LDAP_USER_SEARCH_SCOPE
|
|
app.config['LDAP_SCHEMA'] = config.LDAP_SCHEMA
|
|
|
|
ldap_manager = LDAP3LoginManager(app)
|
|
app.extensions['ldap3_login'] = ldap_manager
|
|
ldap_manager.init_app(app)
|
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
|
|
|
|
def get_attr(user_info, attr_name):
|
|
try:
|
|
value = user_info.get(attr_name)
|
|
if isinstance(value, list) and value:
|
|
return str(value[0])
|
|
elif value:
|
|
return str(value)
|
|
else:
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"Error getting attribute {attr_name}: {e}")
|
|
return None
|
|
|
|
@bp_auth.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if 'user_id' in session:
|
|
return redirect(url_for('dashboard.dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
ldap_manager = current_app.extensions['ldap3_login']
|
|
|
|
try:
|
|
ldap_response = ldap_manager.authenticate(username, password)
|
|
logging.debug(f"ldap_response.status: {ldap_response.status}")
|
|
|
|
if ldap_response.status == AuthenticationResponseStatus.success:
|
|
user_info = ldap_response.user_info
|
|
logging.debug(f"user_info: {user_info}")
|
|
|
|
if not user_info:
|
|
logging.error("LDAP authentication succeeded but no user info was returned.")
|
|
flash("Failed to retrieve user details from LDAP.", "danger")
|
|
return render_template("login.html")
|
|
|
|
sam_account_name = get_attr(user_info, "sAMAccountName")
|
|
# display_name = get_attr(user_info, "displayName")
|
|
email = get_attr(user_info, "mail")
|
|
user_name = get_attr(user_info, "givenName")
|
|
user_middle_name = get_attr(user_info, "middleName")
|
|
user_surname = get_attr(user_info, "sn")
|
|
display_name = f"{user_surname} {user_name} {user_middle_name}"
|
|
user = User(user_id=sam_account_name,
|
|
user_name=user_name,
|
|
user_surname=user_surname,
|
|
user_middle_name=user_middle_name,
|
|
display_name=display_name,
|
|
email=email
|
|
)
|
|
|
|
session.permanent = True
|
|
session['username'] = sam_account_name
|
|
session['display_name'] = display_name # Сохраняем display_name в сессии
|
|
login_user(user)
|
|
logging.debug(f"current_user: {current_user.__dict__}")
|
|
logging.info(f"User {user.id} logged in successfully.")
|
|
# log_user_action(action='Успешная авторизация', details=f'Username: {username}') # Логируем успешную авторизацию
|
|
flash("Logged in successfully!", "success")
|
|
return redirect(url_for("dashboard.dashboard"))
|
|
|
|
elif ldap_response.status == AuthenticationResponseStatus.fail:
|
|
flash('Invalid username or password.', 'danger')
|
|
else:
|
|
flash(f"LDAP Error: {ldap_response.status}", 'danger')
|
|
|
|
except Exception as e:
|
|
logging.error(f"Unexpected error during login: {e}")
|
|
flash("An unexpected error occurred. Please try again.", 'danger')
|
|
|
|
return render_template('login.html') |