Telezab/app/services/auth_service.py
UdoChudo d5f9d3be49
All checks were successful
Build and Push Docker Images / build (push) Successful in 1m21s
fix: (auth page) initialize checking ldap available before login connection to display correct message instean "Invalid username or password"
Signed-off-by: UdoChudo <stream@udochudo.ru>
2025-06-22 23:23:31 +05:00

83 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import current_app
from flask_ldap3_login import LDAP3LoginManager, AuthenticationResponseStatus
from ldap3.core.exceptions import LDAPSocketOpenError, LDAPServerPoolExhaustedError
from werkzeug.middleware.proxy_fix import ProxyFix
import config
def init_ldap(app):
app.config['LDAP_HOST'] = config.LDAP_HOST
app.config['LDAP_PORT'] = config.LDAP_PORT
app.config['LDAP_USE_SSL'] = config.LDAP_USE_SSL
app.config['LDAP_BASE_DN'] = config.LDAP_BASE_DN
app.config['LDAP_BIND_DIRECT_CREDENTIALS'] = False
app.config['LDAP_BIND_USER_DN'] = config.LDAP_BIND_USER_DN
app.config['LDAP_BIND_USER_PASSWORD'] = config.LDAP_USER_PASSWORD
app.config['LDAP_USER_DN'] = config.LDAP_USER_DN
app.config['LDAP_USER_PASSWORD'] = config.LDAP_USER_PASSWORD
app.config['LDAP_USER_OBJECT_FILTER'] = config.LDAP_USER_OBJECT_FILTER
app.config['LDAP_USER_LOGIN_ATTR'] = config.LDAP_USER_LOGIN_ATTR
app.config['LDAP_USER_SEARCH_SCOPE'] = config.LDAP_USER_SEARCH_SCOPE
app.config['LDAP_SCHEMA'] = config.LDAP_SCHEMA
ldap_manager = LDAP3LoginManager(app)
app.extensions['ldap3_login'] = ldap_manager
ldap_manager.init_app(app)
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1)
def authenticate_user(username, password):
ldap_manager = current_app.extensions['ldap3_login']
try:
# Явно пытаемся подключиться к LDAP
conn = ldap_manager.connection
if not conn.bind():
current_app.logger.error(f"Ошибка соединения с LDAP: {conn.last_error}")
return False, None, "LDAP-сервер недоступен. Повторите попытку позже."
except LDAPServerPoolExhaustedError as e:
current_app.logger.error(f"LDAP сервер недоступен: {e}")
return False, None, "LDAP-сервер не отвечает. Повторите попытку позже."
except LDAPSocketOpenError as e:
current_app.logger.error(f"Ошибка подключения к LDAP: {e}")
return False, None, "Ошибка подключения к LDAP-серверу. Повторите попытку позже."
except Exception as e:
current_app.logger.exception("Непредвиденная ошибка при подключении к LDAP")
return False, None, "Внутренняя ошибка при подключении к LDAP."
try:
response = ldap_manager.authenticate(username, password)
except Exception as e:
current_app.logger.exception("Ошибка при попытке аутентификации")
return False, None, "Ошибка при выполнении аутентификации."
if response.status == AuthenticationResponseStatus.success:
return True, response.user_info, None
elif response.status == AuthenticationResponseStatus.fail:
return False, None, "Неверное имя пользователя или пароль."
else:
return False, None, f"LDAP ошибка: {response.status}"
def parse_ldap_user(user_info):
def get(attr):
value = user_info.get(attr)
if isinstance(value, list) and value:
return str(value[0])
elif value:
return str(value)
else:
return None
return {
'sam_account_name': get("sAMAccountName"),
'email': get("mail"),
'user_name': get("givenName"),
'user_middle_name': get("middleName"),
'user_surname': get("sn"),
}