Files
MClogger/web/blueprints/site_admin.py
SimolZimol 8f614a08cc modified: web/blueprints/group_admin.py
modified:   web/blueprints/site_admin.py
	modified:   web/mailer.py
2026-04-13 19:10:21 +02:00

571 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCLogger Site-Admin-Bereich
Verwaltet alle Gruppen und Nutzer global.
"""
from functools import wraps
from datetime import datetime, timedelta
from flask import Blueprint, render_template, request, redirect, url_for, session, flash
from config import Config
from mailer import send_mail, build_invite_email, force_https_url
import panel_db as db
from roles import GROUP_MANAGEMENT_ROLES, GROUP_ROLE_OPTIONS, GROUP_ROLE_SET, role_label
site_admin = Blueprint("site_admin", __name__, url_prefix="/admin")
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("is_site_admin"):
return redirect(url_for("auth.admin_login"))
return f(*args, **kwargs)
return decorated
# ──────────────────────────────────────────────────────────────
# Dashboard
# ──────────────────────────────────────────────────────────────
@site_admin.route("/")
@admin_required
def dashboard():
try:
groups = db.list_all_groups() or []
users = db.list_all_users() or []
has_mail = db.has_site_mail_settings()
for g in groups:
try:
g["has_db"] = db.has_db_configured(g["id"])
except Exception:
g["has_db"] = False
except Exception:
groups, users, has_mail = [], [], False
stats = {
"group_count": len(groups),
"user_count": len(users),
"db_configured": sum(1 for g in groups if g.get("has_db")),
"admin_count": sum(1 for u in users if u.get("is_site_admin")),
"mail_configured": int(has_mail),
}
return render_template("admin/dashboard.html", groups=groups, users=users, stats=stats)
@site_admin.route("/mail", methods=["GET", "POST"])
@admin_required
def mail_settings():
settings = db.get_site_mail_settings()
error = None
if request.method == "POST":
host = request.form.get("host", "").strip()
port = request.form.get("port", type=int) or 0
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
from_email = request.form.get("from_email", "").strip()
from_name = request.form.get("from_name", "").strip()
use_tls = request.form.get("use_tls") == "1"
action = request.form.get("action", "save")
test_recipient = request.form.get("test_recipient", "").strip()
if settings and not password:
password = settings["password"]
if not all([host, port, username, password, from_email]):
error = "Host, port, username, password and from email are required."
elif "@" not in from_email:
error = "Please provide a valid from email address."
else:
candidate = {
"host": host,
"port": port,
"username": username,
"password": password,
"from_email": from_email,
"from_name": from_name,
"use_tls": use_tls,
}
try:
if action == "test":
send_mail(
candidate,
test_recipient or from_email,
"MCLogger SMTP Test",
"This is a test email from your MCLogger admin panel.",
)
flash("Test email sent successfully.", "success")
else:
send_mail(
candidate,
test_recipient or from_email,
"MCLogger SMTP Verification",
"Your SMTP settings were verified successfully and have been saved.",
)
db.set_site_mail_settings(host, port, username, password, from_email, from_name, use_tls)
flash("Mail settings saved and verified.", "success")
return redirect(url_for("site_admin.mail_settings"))
except Exception as exc:
error = f"SMTP connection failed: {exc}"
settings = {
"host": host,
"port": port,
"username": username,
"password": password,
"from_email": from_email,
"from_name": from_name,
"use_tls": use_tls,
}
return render_template("admin/mail_settings.html", settings=settings, error=error)
@site_admin.route("/mail/delete", methods=["POST"])
@admin_required
def mail_settings_delete():
db.delete_site_mail_settings()
flash("Mail settings removed.", "success")
return redirect(url_for("site_admin.mail_settings"))
# ──────────────────────────────────────────────────────────────
# Gruppen verwalten
# ──────────────────────────────────────────────────────────────
@site_admin.route("/groups")
@admin_required
def groups():
all_groups = db.list_all_groups()
for g in all_groups:
g["has_db"] = db.has_db_configured(g["id"])
return render_template("admin/groups.html", groups=all_groups)
@site_admin.route("/groups/new", methods=["GET", "POST"])
@admin_required
def group_new():
if request.method == "POST":
name = request.form.get("name", "").strip()
desc = request.form.get("description", "").strip()
if not name:
flash("Group name must not be empty.", "danger")
elif db.get_group_by_name(name):
flash("A group with that name already exists.", "danger")
else:
db.create_group(name, desc)
flash(f"Group '{name}' created.", "success")
return redirect(url_for("site_admin.groups"))
return render_template("admin/group_edit.html", group=None)
@site_admin.route("/groups/<int:group_id>/edit", methods=["GET", "POST"])
@admin_required
def group_edit(group_id):
group = db.get_group_by_id(group_id)
if not group:
flash("Group not found.", "danger")
return redirect(url_for("site_admin.groups"))
if request.method == "POST":
name = request.form.get("name", "").strip()
desc = request.form.get("description", "").strip()
if not name:
flash("Group name must not be empty.", "danger")
else:
db.update_group(group_id, name, desc)
flash("Group updated.", "success")
return redirect(url_for("site_admin.groups"))
return render_template("admin/group_edit.html", group=group)
@site_admin.route("/groups/<int:group_id>/delete", methods=["POST"])
@admin_required
def group_delete(group_id):
db.delete_group(group_id)
flash("Group deleted.", "success")
return redirect(url_for("site_admin.groups"))
@site_admin.route("/groups/<int:group_id>/members")
@admin_required
def group_members(group_id):
group = db.get_group_by_id(group_id)
members = db.get_group_members(group_id)
pending_invites = db.list_active_group_invites(group_id)
all_users = db.list_all_users()
member_ids = {m["id"] for m in members}
non_members = [u for u in all_users if u["id"] not in member_ids]
return render_template("admin/group_members.html",
group=group, members=members, non_members=non_members,
pending_invites=pending_invites,
role_options=GROUP_ROLE_OPTIONS,
role_label=role_label,
management_roles=GROUP_MANAGEMENT_ROLES)
@site_admin.route("/groups/<int:group_id>/members/add", methods=["POST"])
@admin_required
def group_member_add(group_id):
user_id = request.form.get("user_id", type=int)
role = request.form.get("role", "viewer")
if role not in GROUP_ROLE_SET:
flash("Invalid role selected.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if user_id:
db.add_group_member(user_id, group_id, role)
flash("Member added.", "success")
return redirect(url_for("site_admin.group_members", group_id=group_id))
@site_admin.route("/groups/<int:group_id>/members/<int:user_id>/remove", methods=["POST"])
@admin_required
def group_member_remove(group_id, user_id):
db.remove_group_member(user_id, group_id)
flash("Member removed.", "success")
return redirect(url_for("site_admin.group_members", group_id=group_id))
@site_admin.route("/groups/<int:group_id>/members/<int:user_id>/set-role", methods=["POST"])
@admin_required
def group_member_set_role(group_id, user_id):
member = db.get_group_member(user_id, group_id)
if member:
import json as _json
new_role = request.form.get("role", "viewer")
if new_role not in GROUP_ROLE_SET:
flash("Invalid role selected.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
perms = member["permissions"] if isinstance(member["permissions"], dict) else (_json.loads(member["permissions"]) if member["permissions"] else {})
db.update_member(user_id, group_id, new_role, perms)
flash(f"Role changed to '{new_role}'.", "success")
return redirect(url_for("site_admin.group_members", group_id=group_id))
@site_admin.route("/groups/<int:group_id>/members/invite", methods=["POST"])
@admin_required
def group_member_invite(group_id):
group = db.get_group_by_id(group_id)
if not group:
flash("Group not found.", "danger")
return redirect(url_for("site_admin.groups"))
username = request.form.get("username", "").strip()
email = request.form.get("email", "").strip()
role = request.form.get("role", "viewer")
if not username or not email:
flash("Username and email are required.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if "@" not in email:
flash("Please provide a valid email address.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if role not in GROUP_ROLE_SET:
flash("Invalid role selected.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if db.count_active_group_invites(group_id) >= Config.INVITE_MAX_ACTIVE_PER_GROUP:
flash("Active invite limit reached for this group.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if db.get_user_by_username(username):
flash("Username already exists.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if db.get_active_invite_by_username(group_id, username):
flash("There is already an active invitation for this username.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if db.get_user_by_email(email):
flash("Email address is already in use.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if db.get_active_invite_by_email(group_id, email):
flash("There is already an active invitation for this email.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
token = db.create_group_invite(group_id, username, email, role, session["user_id"])
invite = db.get_invite_by_token(token)
invite_url = force_https_url(url_for("auth.accept_invite", token=token, _external=True))
mail_settings = db.get_site_mail_settings()
if mail_settings:
subject = f"Invitation to join {group['name']}"
text_body, html_body = build_invite_email(
username=username,
invite_url=invite_url,
expiry_text=f"in {Config.INVITE_EXPIRY_HOURS} hours",
group_name=group["name"],
role_name=role_label(role),
)
try:
send_mail(mail_settings, email, subject, text_body, html_body=html_body)
if invite:
db.mark_group_invite_sent(invite["id"], group_id)
flash(f"Invitation email sent to '{email}'.", "success")
except Exception:
flash(f"Invitation created, but email delivery failed. Share this link manually: {invite_url}", "warning")
else:
flash(f"Invitation created for '{username}'. Share this link: {invite_url}", "success")
return redirect(url_for("site_admin.group_members", group_id=group_id))
@site_admin.route("/groups/<int:group_id>/invites/<int:invite_id>/revoke", methods=["POST"])
@admin_required
def group_invite_revoke(group_id, invite_id):
db.revoke_group_invite(invite_id, group_id)
flash("Invitation revoked.", "success")
return redirect(url_for("site_admin.group_members", group_id=group_id))
@site_admin.route("/groups/<int:group_id>/invites/<int:invite_id>/resend", methods=["POST"])
@admin_required
def group_invite_resend(group_id, invite_id):
group = db.get_group_by_id(group_id)
invite = db.get_group_invite_by_id(invite_id, group_id)
if not invite:
flash("Invitation not found.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
if invite.get("accepted_at") or invite.get("revoked_at") or invite["expires_at"] <= datetime.utcnow():
flash("Invitation is no longer active.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
last_sent = invite.get("last_sent_at")
if last_sent and (datetime.utcnow() - last_sent) < timedelta(seconds=Config.INVITE_RESEND_COOLDOWN_SECONDS):
flash("Please wait before resending this invite again.", "warning")
return redirect(url_for("site_admin.group_members", group_id=group_id))
mail_settings = db.get_site_mail_settings()
if not mail_settings:
flash("No SMTP settings configured.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
invite_url = force_https_url(url_for("auth.accept_invite", token=invite["token"], _external=True))
subject = f"Invitation to join {group['name']}"
text_body, html_body = build_invite_email(
username=invite["invited_username"],
invite_url=invite_url,
expiry_text=f"on {invite['expires_at']}",
group_name=group["name"],
role_name=role_label(invite["role"]),
)
try:
send_mail(mail_settings, invite["invited_email"], subject, text_body, html_body=html_body)
db.mark_group_invite_sent(invite_id, group_id)
flash("Invitation email resent.", "success")
except Exception:
flash("Resend failed. Please verify SMTP settings and try again.", "danger")
return redirect(url_for("site_admin.group_members", group_id=group_id))
# ──────────────────────────────────────────────────────────────
# Nutzer verwalten
# ──────────────────────────────────────────────────────────────
@site_admin.route("/users")
@admin_required
def users():
return render_template(
"admin/users.html",
users=db.list_all_users(),
pending_invites=db.list_all_active_invites(),
)
@site_admin.route("/users/new", methods=["GET", "POST"])
@admin_required
def user_new():
groups = db.list_all_groups()
if request.method == "POST":
username = request.form.get("username", "").strip()
email = request.form.get("email", "").strip()
is_site_admin = request.form.get("is_site_admin") == "1"
group_id_raw = request.form.get("group_id", "").strip()
role = request.form.get("role", "viewer")
group_id = int(group_id_raw) if group_id_raw else None
error = None
if not username or not email:
error = "Username and email are required."
elif db.get_user_by_username(username):
error = "Username already taken."
elif db.get_user_by_email(email):
error = "Email address already in use."
elif db.get_active_invite_by_username_global(username):
error = "There is already an active invitation for this username."
elif db.get_active_invite_by_email_global(email):
error = "There is already an active invitation for this email."
elif group_id and role not in GROUP_ROLE_SET:
error = "Invalid role selected."
if error:
flash(error, "danger")
return render_template("admin/user_edit.html", user=None, groups=groups)
effective_role = role if group_id else "member"
token = db.create_group_invite(group_id, username, email, effective_role,
session["user_id"], is_site_admin=is_site_admin)
invite_url = force_https_url(url_for("auth.accept_invite", token=token, _external=True))
mail_settings = db.get_site_mail_settings()
if mail_settings:
if group_id:
group = db.get_group_by_id(group_id)
subject = f"Invitation to join {group['name']}"
body, html_body = build_invite_email(
username=username,
invite_url=invite_url,
expiry_text=f"in {Config.INVITE_EXPIRY_HOURS} hours",
group_name=group["name"],
role_name=role_label(effective_role),
)
else:
subject = "You have been invited to MCLogger"
body, html_body = build_invite_email(
username=username,
invite_url=invite_url,
expiry_text=f"in {Config.INVITE_EXPIRY_HOURS} hours",
)
try:
send_mail(mail_settings, email, subject, body, html_body=html_body)
invite = db.get_invite_by_token(token)
if invite:
db.mark_invite_sent_global(invite["id"])
flash(f"Invitation email sent to '{email}'.", "success")
except Exception:
flash(
f"Invitation created, but email delivery failed. "
f"Share this link manually: {invite_url}",
"warning",
)
else:
flash(f"Invitation created for '{username}'. Share this link: {invite_url}", "success")
return redirect(url_for("site_admin.users"))
return render_template("admin/user_edit.html", user=None, groups=groups)
@site_admin.route("/users/invites/<int:invite_id>/revoke", methods=["POST"])
@admin_required
def user_invite_revoke(invite_id):
db.revoke_invite_global(invite_id)
flash("Invitation revoked.", "success")
return redirect(url_for("site_admin.users"))
@site_admin.route("/users/invites/<int:invite_id>/resend", methods=["POST"])
@admin_required
def user_invite_resend(invite_id):
invite = db.get_invite_by_id_global(invite_id)
if not invite:
flash("Invitation not found.", "danger")
return redirect(url_for("site_admin.users"))
if invite.get("accepted_at") or invite.get("revoked_at") or invite["expires_at"] <= datetime.utcnow():
flash("Invitation is no longer active.", "danger")
return redirect(url_for("site_admin.users"))
last_sent = invite.get("last_sent_at")
if last_sent and (datetime.utcnow() - last_sent) < timedelta(seconds=Config.INVITE_RESEND_COOLDOWN_SECONDS):
flash("Please wait before resending this invite again.", "warning")
return redirect(url_for("site_admin.users"))
mail_settings = db.get_site_mail_settings()
if not mail_settings:
flash("No SMTP settings configured.", "danger")
return redirect(url_for("site_admin.users"))
invite_url = force_https_url(url_for("auth.accept_invite", token=invite["token"], _external=True))
if invite["group_id"]:
group = db.get_group_by_id(invite["group_id"])
subject = f"Invitation to join {group['name']}"
body, html_body = build_invite_email(
username=invite["invited_username"],
invite_url=invite_url,
expiry_text=f"on {invite['expires_at']}",
group_name=group["name"],
role_name=role_label(invite["role"]),
)
else:
subject = "You have been invited to MCLogger"
body, html_body = build_invite_email(
username=invite["invited_username"],
invite_url=invite_url,
expiry_text=f"on {invite['expires_at']}",
)
try:
send_mail(mail_settings, invite["invited_email"], subject, body, html_body=html_body)
db.mark_invite_sent_global(invite_id)
flash("Invitation email resent.", "success")
except Exception:
flash("Resend failed. Please verify SMTP settings and try again.", "danger")
return redirect(url_for("site_admin.users"))
@site_admin.route("/users/<int:user_id>/edit", methods=["GET", "POST"])
@admin_required
def user_edit(user_id):
user = db.get_user_by_id(user_id)
if not user:
flash("User not found.", "danger")
return redirect(url_for("site_admin.users"))
if request.method == "POST":
username = request.form.get("username", "").strip()
email = request.form.get("email", "").strip()
is_site_admin = request.form.get("is_site_admin") == "1"
new_password = request.form.get("new_password", "")
existing = db.get_user_by_email(email)
if existing and existing["id"] != user_id:
flash("Email address already in use.", "danger")
return render_template("admin/user_edit.html", user=user)
db.update_user(user_id, username, email, is_site_admin)
if new_password:
db.change_password(user_id, new_password)
flash("Password changed.", "info")
flash("User updated.", "success")
return redirect(url_for("site_admin.users"))
return render_template("admin/user_edit.html", user=user)
@site_admin.route("/users/<int:user_id>/delete", methods=["POST"])
@admin_required
def user_delete(user_id):
if user_id == session.get("user_id"):
flash("You cannot delete yourself.", "danger")
else:
db.delete_user(user_id)
flash("User deleted.", "success")
return redirect(url_for("site_admin.users"))
# ──────────────────────────────────────────────────────────────
# Als Gruppe anzeigen (Site-Admin liest Gruppen-DB)
# ──────────────────────────────────────────────────────────────
@site_admin.route("/view-group/<int:group_id>", methods=["GET", "POST"])
@admin_required
def view_group(group_id):
"""Site Admin temporarily switches into a group to view its MC data."""
if request.method == "GET":
flash("Please use the Browse button from the admin panel.", "warning")
return redirect(url_for("site_admin.dashboard"))
group = db.get_group_by_id(group_id)
if not group:
flash("Group not found.", "danger")
return redirect(url_for("site_admin.dashboard"))
if not db.has_db_configured(group_id):
flash("No database configured for this group.", "warning")
return redirect(url_for("site_admin.dashboard"))
# Alle Berechtigungen als Admin
all_perms = {k: True for k in ["view_dashboard","view_players","view_sessions",
"view_chat","view_commands","view_deaths","view_blocks",
"view_proxy","view_server_events","view_perms"]}
session["group_id"] = group_id
session["group_name"] = group["name"]
session["role"] = "group_owner"
session["permissions"] = all_perms
session["admin_viewing"] = True
return redirect(url_for("panel.dashboard"))
@site_admin.route("/stop-view", methods=["GET", "POST"])
@admin_required
def stop_view():
"""Kehrt zum Site-Admin-Dashboard zurück."""
if request.method == "GET":
flash("Please use the Back to Admin button.", "warning")
return redirect(url_for("site_admin.dashboard"))
session.pop("group_id", None)
session.pop("group_name", None)
session.pop("role", None)
session.pop("permissions", None)
session.pop("admin_viewing", None)
return redirect(url_for("site_admin.dashboard"))