""" MCLogger – Site-Admin-Bereich Verwaltet alle Gruppen und Nutzer global. """ from functools import wraps from datetime import datetime, timedelta from flask import Blueprint, render_template, request, redirect, url_for, session, flash from config import Config from mailer import send_mail import panel_db as db from roles import GROUP_MANAGEMENT_ROLES, GROUP_ROLE_OPTIONS, GROUP_ROLE_SET, role_label site_admin = Blueprint("site_admin", __name__, url_prefix="/admin") def admin_required(f): @wraps(f) def decorated(*args, **kwargs): if not session.get("is_site_admin"): return redirect(url_for("auth.admin_login")) return f(*args, **kwargs) return decorated # ────────────────────────────────────────────────────────────── # Dashboard # ────────────────────────────────────────────────────────────── @site_admin.route("/") @admin_required def dashboard(): try: groups = db.list_all_groups() or [] users = db.list_all_users() or [] has_mail = db.has_site_mail_settings() for g in groups: try: g["has_db"] = db.has_db_configured(g["id"]) except Exception: g["has_db"] = False except Exception: groups, users, has_mail = [], [], False stats = { "group_count": len(groups), "user_count": len(users), "db_configured": sum(1 for g in groups if g.get("has_db")), "admin_count": sum(1 for u in users if u.get("is_site_admin")), "mail_configured": int(has_mail), } return render_template("admin/dashboard.html", groups=groups, users=users, stats=stats) @site_admin.route("/mail", methods=["GET", "POST"]) @admin_required def mail_settings(): settings = db.get_site_mail_settings() error = None if request.method == "POST": host = request.form.get("host", "").strip() port = request.form.get("port", type=int) or 0 username = request.form.get("username", "").strip() password = request.form.get("password", "") from_email = request.form.get("from_email", "").strip() from_name = request.form.get("from_name", "").strip() use_tls = request.form.get("use_tls") == "1" action = request.form.get("action", "save") test_recipient = request.form.get("test_recipient", "").strip() if settings and not password: password = settings["password"] if not all([host, port, username, password, from_email]): error = "Host, port, username, password and from email are required." elif "@" not in from_email: error = "Please provide a valid from email address." else: candidate = { "host": host, "port": port, "username": username, "password": password, "from_email": from_email, "from_name": from_name, "use_tls": use_tls, } try: if action == "test": send_mail( candidate, test_recipient or from_email, "MCLogger SMTP Test", "This is a test email from your MCLogger admin panel.", ) flash("Test email sent successfully.", "success") else: send_mail( candidate, test_recipient or from_email, "MCLogger SMTP Verification", "Your SMTP settings were verified successfully and have been saved.", ) db.set_site_mail_settings(host, port, username, password, from_email, from_name, use_tls) flash("Mail settings saved and verified.", "success") return redirect(url_for("site_admin.mail_settings")) except Exception as exc: error = f"SMTP connection failed: {exc}" settings = { "host": host, "port": port, "username": username, "password": password, "from_email": from_email, "from_name": from_name, "use_tls": use_tls, } return render_template("admin/mail_settings.html", settings=settings, error=error) @site_admin.route("/mail/delete", methods=["POST"]) @admin_required def mail_settings_delete(): db.delete_site_mail_settings() flash("Mail settings removed.", "success") return redirect(url_for("site_admin.mail_settings")) # ────────────────────────────────────────────────────────────── # Gruppen verwalten # ────────────────────────────────────────────────────────────── @site_admin.route("/groups") @admin_required def groups(): all_groups = db.list_all_groups() for g in all_groups: g["has_db"] = db.has_db_configured(g["id"]) return render_template("admin/groups.html", groups=all_groups) @site_admin.route("/groups/new", methods=["GET", "POST"]) @admin_required def group_new(): if request.method == "POST": name = request.form.get("name", "").strip() desc = request.form.get("description", "").strip() if not name: flash("Group name must not be empty.", "danger") elif db.get_group_by_name(name): flash("A group with that name already exists.", "danger") else: db.create_group(name, desc) flash(f"Group '{name}' created.", "success") return redirect(url_for("site_admin.groups")) return render_template("admin/group_edit.html", group=None) @site_admin.route("/groups//edit", methods=["GET", "POST"]) @admin_required def group_edit(group_id): group = db.get_group_by_id(group_id) if not group: flash("Group not found.", "danger") return redirect(url_for("site_admin.groups")) if request.method == "POST": name = request.form.get("name", "").strip() desc = request.form.get("description", "").strip() if not name: flash("Group name must not be empty.", "danger") else: db.update_group(group_id, name, desc) flash("Group updated.", "success") return redirect(url_for("site_admin.groups")) return render_template("admin/group_edit.html", group=group) @site_admin.route("/groups//delete", methods=["POST"]) @admin_required def group_delete(group_id): db.delete_group(group_id) flash("Group deleted.", "success") return redirect(url_for("site_admin.groups")) @site_admin.route("/groups//members") @admin_required def group_members(group_id): group = db.get_group_by_id(group_id) members = db.get_group_members(group_id) pending_invites = db.list_active_group_invites(group_id) all_users = db.list_all_users() member_ids = {m["id"] for m in members} non_members = [u for u in all_users if u["id"] not in member_ids] return render_template("admin/group_members.html", group=group, members=members, non_members=non_members, pending_invites=pending_invites, role_options=GROUP_ROLE_OPTIONS, role_label=role_label, management_roles=GROUP_MANAGEMENT_ROLES) @site_admin.route("/groups//members/add", methods=["POST"]) @admin_required def group_member_add(group_id): user_id = request.form.get("user_id", type=int) role = request.form.get("role", "viewer") if role not in GROUP_ROLE_SET: flash("Invalid role selected.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if user_id: db.add_group_member(user_id, group_id, role) flash("Member added.", "success") return redirect(url_for("site_admin.group_members", group_id=group_id)) @site_admin.route("/groups//members//remove", methods=["POST"]) @admin_required def group_member_remove(group_id, user_id): db.remove_group_member(user_id, group_id) flash("Member removed.", "success") return redirect(url_for("site_admin.group_members", group_id=group_id)) @site_admin.route("/groups//members//set-role", methods=["POST"]) @admin_required def group_member_set_role(group_id, user_id): member = db.get_group_member(user_id, group_id) if member: import json as _json new_role = request.form.get("role", "viewer") if new_role not in GROUP_ROLE_SET: flash("Invalid role selected.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) perms = member["permissions"] if isinstance(member["permissions"], dict) else (_json.loads(member["permissions"]) if member["permissions"] else {}) db.update_member(user_id, group_id, new_role, perms) flash(f"Role changed to '{new_role}'.", "success") return redirect(url_for("site_admin.group_members", group_id=group_id)) @site_admin.route("/groups//members/invite", methods=["POST"]) @admin_required def group_member_invite(group_id): group = db.get_group_by_id(group_id) if not group: flash("Group not found.", "danger") return redirect(url_for("site_admin.groups")) username = request.form.get("username", "").strip() email = request.form.get("email", "").strip() role = request.form.get("role", "viewer") if not username or not email: flash("Username and email are required.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if "@" not in email: flash("Please provide a valid email address.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if role not in GROUP_ROLE_SET: flash("Invalid role selected.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if db.count_active_group_invites(group_id) >= Config.INVITE_MAX_ACTIVE_PER_GROUP: flash("Active invite limit reached for this group.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if db.get_user_by_username(username): flash("Username already exists.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if db.get_active_invite_by_username(group_id, username): flash("There is already an active invitation for this username.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if db.get_user_by_email(email): flash("Email address is already in use.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if db.get_active_invite_by_email(group_id, email): flash("There is already an active invitation for this email.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) token = db.create_group_invite(group_id, username, email, role, session["user_id"]) invite = db.get_invite_by_token(token) invite_url = url_for("auth.accept_invite", token=token, _external=True) mail_settings = db.get_site_mail_settings() if mail_settings: subject = f"Invitation to join {group['name']}" text_body = ( f"Hello {username},\n\n" f"You have been invited to join the group '{group['name']}' on MCLogger as {role_label(role)}.\n" f"Open this link to create your account:\n\n{invite_url}\n\n" f"This invite expires in {Config.INVITE_EXPIRY_HOURS} hours.\n" ) try: send_mail(mail_settings, email, subject, text_body) if invite: db.mark_group_invite_sent(invite["id"], group_id) flash(f"Invitation email sent to '{email}'.", "success") except Exception: flash(f"Invitation created, but email delivery failed. Share this link manually: {invite_url}", "warning") else: flash(f"Invitation created for '{username}'. Share this link: {invite_url}", "success") return redirect(url_for("site_admin.group_members", group_id=group_id)) @site_admin.route("/groups//invites//revoke", methods=["POST"]) @admin_required def group_invite_revoke(group_id, invite_id): db.revoke_group_invite(invite_id, group_id) flash("Invitation revoked.", "success") return redirect(url_for("site_admin.group_members", group_id=group_id)) @site_admin.route("/groups//invites//resend", methods=["POST"]) @admin_required def group_invite_resend(group_id, invite_id): group = db.get_group_by_id(group_id) invite = db.get_group_invite_by_id(invite_id, group_id) if not invite: flash("Invitation not found.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) if invite.get("accepted_at") or invite.get("revoked_at") or invite["expires_at"] <= datetime.utcnow(): flash("Invitation is no longer active.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) last_sent = invite.get("last_sent_at") if last_sent and (datetime.utcnow() - last_sent) < timedelta(seconds=Config.INVITE_RESEND_COOLDOWN_SECONDS): flash("Please wait before resending this invite again.", "warning") return redirect(url_for("site_admin.group_members", group_id=group_id)) mail_settings = db.get_site_mail_settings() if not mail_settings: flash("No SMTP settings configured.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) invite_url = url_for("auth.accept_invite", token=invite["token"], _external=True) subject = f"Invitation to join {group['name']}" text_body = ( f"Hello {invite['invited_username']},\n\n" f"You have been invited to join the group '{group['name']}' on MCLogger as {role_label(invite['role'])}.\n" f"Open this link to create your account:\n\n{invite_url}\n\n" f"This invite expires on {invite['expires_at']}.\n" ) try: send_mail(mail_settings, invite["invited_email"], subject, text_body) db.mark_group_invite_sent(invite_id, group_id) flash("Invitation email resent.", "success") except Exception: flash("Resend failed. Please verify SMTP settings and try again.", "danger") return redirect(url_for("site_admin.group_members", group_id=group_id)) # ────────────────────────────────────────────────────────────── # Nutzer verwalten # ────────────────────────────────────────────────────────────── @site_admin.route("/users") @admin_required def users(): return render_template("admin/users.html", users=db.list_all_users()) @site_admin.route("/users/new", methods=["GET", "POST"]) @admin_required def user_new(): if request.method == "POST": username = request.form.get("username", "").strip() email = request.form.get("email", "").strip() password = request.form.get("password", "") is_site_admin = request.form.get("is_site_admin") == "1" if not username or not email or not password: flash("All fields are required.", "danger") elif db.get_user_by_username(username): flash("Username already taken.", "danger") elif db.get_user_by_email(email): flash("Email address already in use.", "danger") else: db.create_user(username, email, password, is_site_admin) flash(f"User '{username}' created.", "success") return redirect(url_for("site_admin.users")) return render_template("admin/user_edit.html", user=None) @site_admin.route("/users//edit", methods=["GET", "POST"]) @admin_required def user_edit(user_id): user = db.get_user_by_id(user_id) if not user: flash("User not found.", "danger") return redirect(url_for("site_admin.users")) if request.method == "POST": username = request.form.get("username", "").strip() email = request.form.get("email", "").strip() is_site_admin = request.form.get("is_site_admin") == "1" new_password = request.form.get("new_password", "") existing = db.get_user_by_email(email) if existing and existing["id"] != user_id: flash("Email address already in use.", "danger") return render_template("admin/user_edit.html", user=user) db.update_user(user_id, username, email, is_site_admin) if new_password: db.change_password(user_id, new_password) flash("Password changed.", "info") flash("User updated.", "success") return redirect(url_for("site_admin.users")) return render_template("admin/user_edit.html", user=user) @site_admin.route("/users//delete", methods=["POST"]) @admin_required def user_delete(user_id): if user_id == session.get("user_id"): flash("You cannot delete yourself.", "danger") else: db.delete_user(user_id) flash("User deleted.", "success") return redirect(url_for("site_admin.users")) # ────────────────────────────────────────────────────────────── # Als Gruppe anzeigen (Site-Admin liest Gruppen-DB) # ────────────────────────────────────────────────────────────── @site_admin.route("/view-group/", methods=["GET", "POST"]) @admin_required def view_group(group_id): """Site Admin temporarily switches into a group to view its MC data.""" if request.method == "GET": flash("Please use the Browse button from the admin panel.", "warning") return redirect(url_for("site_admin.dashboard")) group = db.get_group_by_id(group_id) if not group: flash("Group not found.", "danger") return redirect(url_for("site_admin.dashboard")) if not db.has_db_configured(group_id): flash("No database configured for this group.", "warning") return redirect(url_for("site_admin.dashboard")) # Alle Berechtigungen als Admin all_perms = {k: True for k in ["view_dashboard","view_players","view_sessions", "view_chat","view_commands","view_deaths","view_blocks", "view_proxy","view_server_events","view_perms"]} session["group_id"] = group_id session["group_name"] = group["name"] session["role"] = "group_owner" session["permissions"] = all_perms session["admin_viewing"] = True return redirect(url_for("panel.dashboard")) @site_admin.route("/stop-view", methods=["GET", "POST"]) @admin_required def stop_view(): """Kehrt zum Site-Admin-Dashboard zurück.""" if request.method == "GET": flash("Please use the Back to Admin button.", "warning") return redirect(url_for("site_admin.dashboard")) session.pop("group_id", None) session.pop("group_name", None) session.pop("role", None) session.pop("permissions", None) session.pop("admin_viewing", None) return redirect(url_for("site_admin.dashboard"))