Telezab/frontend/dashboard.py
2025-03-17 15:30:58 +05:00

178 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, render_template, jsonify, request, redirect, url_for
from flask_login import login_required
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, text
from config import DB_PATH, BASE_URL
from .models import Region # Импортируем модель региона
# Создаём Blueprint
bp_dashboard = Blueprint('dashboard', __name__, url_prefix='/telezab/')
bp_api = Blueprint('api', __name__, url_prefix='/telezab/rest/api')
db_engine = create_engine(f'sqlite:///{DB_PATH}')
Session = sessionmaker(bind=db_engine)
# Роуты для отображения страниц
@bp_dashboard.route('/')
# @login_required
def dashboard():
return render_template('index.html')
@bp_dashboard.route('/users')
# @login_required
def users_page():
return render_template('users.html')
@bp_dashboard.route('/logs')
# @login_required
def logs_page():
return render_template('logs.html')
@bp_dashboard.route('/regions')
# @login_required
def regions_page():
return render_template('regions.html')
# Роуты для API
@bp_api.route('/users', methods=['GET'])
def get_users():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
session = Session()
query = text("""
SELECT w.chat_id, w.username, w.user_email ,s.region_id, s.disaster_only
FROM whitelist w
LEFT JOIN subscriptions s ON w.chat_id = s.chat_id AND s.active = 1
""")
users = session.execute(query).fetchall()
# Если users пустые, выводим сообщение в консоль
if not users:
print("No users found")
# Группируем подписки по chat_id
user_dict = {}
for u in users:
chat_id = u[0]
if chat_id not in user_dict:
disaster_only_text = "Только критические уведомления" if u[4] == 1 else "Все уведомления"
# is_blocked = "Заблокирован" if u[5] == 1 else "Активен"
user_dict[chat_id] = {
'id': u[0],
'username': u[1],
'email': u[2],
'subscriptions': [],
'disaster_only': disaster_only_text,
# 'status': is_blocked
}
if u[3]:
user_dict[chat_id]['subscriptions'].append(u[3])
users_list = list(user_dict.values())
total_users = len(users_list)
total_pages = (total_users + per_page - 1) // per_page
start = (page - 1) * per_page
end = start + per_page
users_page = users_list[start:end]
session.close()
return jsonify({
'users': users_page,
'total_users': total_users,
'total_pages': total_pages,
'current_page': page,
'per_page': per_page
})
@bp_api.route('/regions', methods=['GET', 'POST'])
def manage_regions():
session = Session()
if request.method == 'POST':
data = request.json
region_id = data.get('region_id')
name = data.get('name')
active = data.get('active', True)
region = Region(region_id=region_id, region_name=name, active=active)
session.add(region)
session.commit()
return jsonify({'status': 'success'})
regions = session.query(Region).all()
session.close()
return jsonify([{'region_id': r.region_id, 'name': r.region_name, 'active': r.active} for r in regions])
@bp_api.route('/regions/<int:region_id>', methods=['PUT', 'DELETE'])
def edit_region(region_id):
session = Session()
region = session.query(Region).filter_by(region_id=region_id).first()
if request.method == 'PUT':
data = request.json
region.region_name = data.get('name', region.region_name)
region.active = data.get('active', region.active)
session.commit()
session.close()
return jsonify({'status': 'updated'})
elif request.method == 'DELETE':
session.delete(region)
session.commit()
session.close()
return jsonify({'status': 'deleted'})
@bp_api.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
session = Session()
user = session.execute(text("SELECT * FROM whitelist WHERE chat_id = :id"), {'id': user_id}).fetchone()
session.close()
if not user:
return jsonify({'error': 'Пользователь не найден'}), 404
return jsonify({'id': user.chat_id, 'username': user.username, 'email': user.user_email, 'blocked': user.is_blocked})
# @bp_api.route('/users/<int:user_id>/block', methods=['POST'])
# def block_user(user_id):
# session = Session()
# session.execute(text("UPDATE whitelist SET is_blocked = False WHERE chat_id = :id"), {'id': user_id})
# session.commit()
# session.close()
# return jsonify({'status': 'updated'})
@bp_api.route('/users/<int:user_id>/block', methods=['POST'])
def block_user(user_id):
session = Session()
# Получаем текущий статус блокировки пользователя
result = session.execute(text("SELECT is_blocked FROM whitelist WHERE chat_id = :id"), {'id': user_id}).fetchone()
if result:
is_blocked = result[0] # Текущее значение блокировки
# Если пользователь заблокирован, разблокируем его, если разблокирован - блокируем
new_status = not is_blocked
# Обновляем статус блокировки в базе данных
session.execute(
text("UPDATE whitelist SET is_blocked = :new_status WHERE chat_id = :id"),
{'new_status': new_status, 'id': user_id}
)
session.commit()
session.close()
return jsonify({'status': 'updated', 'new_status': new_status})
else:
session.close()
return jsonify({'status': 'error', 'message': 'User not found'}), 404
@bp_api.route('/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
session = Session()
session.execute(text("DELETE FROM whitelist WHERE chat_id = :id"), {'id': user_id})
session.commit()
session.close()
return jsonify({'status': 'deleted'})