Files
MClogger/web/blueprints/group_admin.py
simon 3b78f5dfb1 modified: web/app.py
modified:   web/blueprints/auth.py
	modified:   web/blueprints/group_admin.py
	modified:   web/blueprints/site_admin.py
	new file:   web/limiter.py
	modified:   web/panel_db.py
	modified:   web/requirements.txt
	new file:   web/templates/429.html
	new file:   web/templates/admin/audit_log.html
	modified:   web/templates/admin/base.html
2026-04-14 13:02:41 +02:00

363 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCLogger Gruppen-Admin-Bereich
Gruppen-Admins können ihre Mitglieder und MC-DB-Verbindung verwalten.
"""
import json
from datetime import datetime, timedelta
from functools import wraps
from flask import Blueprint, render_template, request, redirect, url_for, session, flash
from config import Config
from mailer import send_mail, build_invite_email, force_https_url
import panel_db as db
from roles import GROUP_MANAGEMENT_ROLES, GROUP_ROLE_OPTIONS, GROUP_ROLE_SET, OWNER_ONLY_ROLES, role_label
from limiter import limiter
group_admin = Blueprint("group_admin", __name__, url_prefix="/group-admin")
# Role options that group admins are allowed to assign (owner excluded)
_NON_OWNER_ROLE_OPTIONS = [(r, l) for r, l in GROUP_ROLE_OPTIONS if r not in OWNER_ONLY_ROLES]
ALL_PERMISSIONS = [
("view_dashboard", "Dashboard"),
("view_players", "Players"),
("view_sessions", "Sessions"),
("view_chat", "Chat"),
("view_commands", "Commands"),
("view_deaths", "Deaths"),
("view_blocks", "Block Events"),
("view_proxy", "Proxy Events"),
("view_server_events", "Server Events"),
("view_perms", "Permissions"),
]
def group_admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not session.get("user_id"):
return redirect(url_for("auth.login"))
if session.get("is_site_admin"):
return redirect(url_for("site_admin.dashboard"))
if session.get("role") not in GROUP_MANAGEMENT_ROLES:
flash("You do not have group admin permission.", "danger")
return redirect(url_for("panel.dashboard"))
return f(*args, **kwargs)
return decorated
@group_admin.route("/")
@group_admin_required
def dashboard():
group_id = session["group_id"]
group = db.get_group_by_id(group_id)
members = db.get_group_members(group_id)
has_db = db.has_db_configured(group_id)
stats = {
"member_count": len(members),
"admin_count": sum(1 for m in members if m.get("role") in GROUP_MANAGEMENT_ROLES),
"db_configured": bool(has_db),
}
return render_template("group_admin/dashboard.html",
group=group, members=members, has_db=has_db, stats=stats)
# ──────────────────────────────────────────────────────────────
# Mitglieder
# ──────────────────────────────────────────────────────────────
@group_admin.route("/members")
@group_admin_required
def members():
group_id = session["group_id"]
group = db.get_group_by_id(group_id)
members = db.get_group_members(group_id)
pending_invites = db.list_active_group_invites(group_id)
all_users = db.list_all_users()
member_ids = {m["id"] for m in members}
non_members = [u for u in all_users if u["id"] not in member_ids and not u["is_site_admin"]]
return render_template("group_admin/members.html",
group=group, members=members, non_members=non_members, pending_invites=pending_invites,
all_permissions=ALL_PERMISSIONS,
role_options=_NON_OWNER_ROLE_OPTIONS,
role_label=role_label)
@group_admin.route("/members/add", methods=["POST"])
@group_admin_required
def member_add():
group_id = session["group_id"]
user_id = request.form.get("user_id", type=int)
role = request.form.get("role", "viewer")
if role in OWNER_ONLY_ROLES:
flash("The Group Owner role can only be assigned by a Site Admin.", "danger")
return redirect(url_for("group_admin.members"))
if role not in GROUP_ROLE_SET:
flash("Invalid role selected.", "danger")
return redirect(url_for("group_admin.members"))
if user_id:
db.add_group_member(user_id, group_id, role)
target_user = db.get_user_by_id(user_id)
db.log_audit_event(
session["user_id"], session["username"], "member.added",
entity_type="user", entity_id=user_id,
details={"role": role, "target": target_user["username"] if target_user else str(user_id)},
group_id=group_id, ip_address=request.remote_addr,
)
flash("Member added.", "success")
return redirect(url_for("group_admin.members"))
@group_admin.route("/members/invite", methods=["POST"])
@group_admin_required
@limiter.limit("30 per hour", methods=["POST"])
def member_invite():
group_id = session["group_id"]
username = request.form.get("username", "").strip()
email = request.form.get("email", "").strip()
role = request.form.get("role", "viewer")
if not username or not email:
flash("Username and email are required.", "danger")
return redirect(url_for("group_admin.members"))
if "@" not in email:
flash("Please provide a valid email address.", "danger")
return redirect(url_for("group_admin.members"))
if role not in GROUP_ROLE_SET:
flash("Invalid role selected.", "danger")
return redirect(url_for("group_admin.members"))
if role in OWNER_ONLY_ROLES:
flash("The Group Owner role can only be assigned by a Site Admin.", "danger")
return redirect(url_for("group_admin.members"))
if db.count_active_group_invites(group_id) >= Config.INVITE_MAX_ACTIVE_PER_GROUP:
flash("Active invite limit reached for this group. Revoke old invites or wait for expiry.", "danger")
return redirect(url_for("group_admin.members"))
if db.get_user_by_username(username):
flash("Username already exists.", "danger")
return redirect(url_for("group_admin.members"))
if db.get_active_invite_by_username(group_id, username):
flash("There is already an active invitation for this username in the group.", "danger")
return redirect(url_for("group_admin.members"))
if db.get_user_by_email(email):
flash("Email address is already in use.", "danger")
return redirect(url_for("group_admin.members"))
if db.get_active_invite_by_email(group_id, email):
flash("There is already an active invitation for this email in the group.", "danger")
return redirect(url_for("group_admin.members"))
token = db.create_group_invite(group_id, username, email, role, session["user_id"])
invite = db.get_invite_by_token(token)
invite_url = force_https_url(url_for("auth.accept_invite", token=token, _external=True))
db.log_audit_event(
session["user_id"], session["username"], "invite.created",
entity_type="invite", entity_id=invite["id"] if invite else None,
details={"username": username, "email": email, "role": role},
group_id=group_id, ip_address=request.remote_addr,
)
mail_settings = db.get_site_mail_settings()
if mail_settings:
subject = f"Invitation to join {session.get('group_name', 'your group')}"
text_body, html_body = build_invite_email(
username=username,
invite_url=invite_url,
expiry_text=f"in {Config.INVITE_EXPIRY_HOURS} hours",
group_name=session.get("group_name", "your group"),
role_name=role_label(role),
)
try:
send_mail(mail_settings, email, subject, text_body, html_body=html_body)
if invite:
db.mark_group_invite_sent(invite["id"], group_id)
flash(f"Invitation email sent to '{email}'.", "success")
except Exception:
flash(f"Invitation created, but email delivery failed. Share this link manually: {invite_url}", "warning")
else:
flash(f"Invitation created for '{username}'. Share this link: {invite_url}", "success")
return redirect(url_for("group_admin.members"))
@group_admin.route("/invites/<int:invite_id>/resend", methods=["POST"])
@group_admin_required
@limiter.limit("20 per hour", methods=["POST"])
def resend_invite(invite_id):
group_id = session["group_id"]
invite = db.get_group_invite_by_id(invite_id, group_id)
if not invite:
flash("Invitation not found.", "danger")
return redirect(url_for("group_admin.members"))
if invite.get("accepted_at") or invite.get("revoked_at") or invite["expires_at"] <= datetime.utcnow():
flash("Invitation is no longer active.", "danger")
return redirect(url_for("group_admin.members"))
last_sent_at = invite.get("last_sent_at")
if last_sent_at and (datetime.utcnow() - last_sent_at) < timedelta(seconds=Config.INVITE_RESEND_COOLDOWN_SECONDS):
flash("Please wait before resending this invite again.", "warning")
return redirect(url_for("group_admin.members"))
mail_settings = db.get_site_mail_settings()
if not mail_settings:
flash("No SMTP settings configured by Site Admin.", "danger")
return redirect(url_for("group_admin.members"))
invite_url = force_https_url(url_for("auth.accept_invite", token=invite["token"], _external=True))
subject = f"Invitation to join {session.get('group_name', 'your group')}"
text_body, html_body = build_invite_email(
username=invite["invited_username"],
invite_url=invite_url,
expiry_text=f"on {invite['expires_at']}",
group_name=session.get("group_name", "your group"),
role_name=role_label(invite["role"]),
)
try:
send_mail(mail_settings, invite["invited_email"], subject, text_body, html_body=html_body)
db.mark_group_invite_sent(invite_id, group_id)
db.log_audit_event(
session["user_id"], session["username"], "invite.resent",
entity_type="invite", entity_id=invite_id,
details={"to": invite["invited_email"], "username": invite["invited_username"]},
group_id=group_id, ip_address=request.remote_addr,
)
flash("Invitation email resent.", "success")
except Exception:
flash("Resend failed. Please verify SMTP settings and try again.", "danger")
return redirect(url_for("group_admin.members"))
@group_admin.route("/invites/<int:invite_id>/revoke", methods=["POST"])
@group_admin_required
def revoke_invite(invite_id):
invite = db.get_group_invite_by_id(invite_id, session["group_id"])
db.revoke_group_invite(invite_id, session["group_id"])
db.log_audit_event(
session["user_id"], session["username"], "invite.revoked",
entity_type="invite", entity_id=invite_id,
details={"username": invite["invited_username"] if invite else None},
group_id=session["group_id"], ip_address=request.remote_addr,
)
flash("Invitation revoked.", "success")
return redirect(url_for("group_admin.members"))
@group_admin.route("/members/<int:user_id>/edit", methods=["GET", "POST"])
@group_admin_required
def member_edit(user_id):
group_id = session["group_id"]
group = db.get_group_by_id(group_id)
member = db.get_group_member(user_id, group_id)
user = db.get_user_by_id(user_id)
if not member or not user:
flash("Member not found.", "danger")
return redirect(url_for("group_admin.members"))
raw_perms = member.get("permissions")
current_perms = json.loads(raw_perms) if isinstance(raw_perms, str) else (raw_perms or {})
if request.method == "POST":
role = request.form.get("role", "viewer")
if role in OWNER_ONLY_ROLES:
flash("The Group Owner role can only be assigned by a Site Admin.", "danger")
return redirect(url_for("group_admin.members"))
if role not in GROUP_ROLE_SET:
flash("Invalid role selected.", "danger")
return redirect(url_for("group_admin.members"))
new_perms = {key: bool(request.form.get(f"perm_{key}")) for key, _ in ALL_PERMISSIONS}
old_role = member.get("role")
db.update_member(user_id, group_id, role, new_perms)
db.log_audit_event(
session["user_id"], session["username"], "member.updated",
entity_type="user", entity_id=user_id,
details={"target": user["username"], "old_role": old_role, "new_role": role},
group_id=group_id, ip_address=request.remote_addr,
)
flash("Permissions updated.", "success")
return redirect(url_for("group_admin.members"))
return render_template("group_admin/member_edit.html",
group=group, user=user, member=member,
current_perms=current_perms, all_permissions=ALL_PERMISSIONS,
role_options=_NON_OWNER_ROLE_OPTIONS,
role_label=role_label)
@group_admin.route("/members/<int:user_id>/remove", methods=["POST"])
@group_admin_required
def member_remove(user_id):
if user_id == session["user_id"]:
flash("You cannot remove yourself.", "danger")
else:
target_user = db.get_user_by_id(user_id)
db.remove_group_member(user_id, session["group_id"])
db.log_audit_event(
session["user_id"], session["username"], "member.removed",
entity_type="user", entity_id=user_id,
details={"target": target_user["username"] if target_user else str(user_id)},
group_id=session["group_id"], ip_address=request.remote_addr,
)
flash("Member removed.", "success")
return redirect(url_for("group_admin.members"))
# ──────────────────────────────────────────────────────────────
# Datenbank-Konfiguration
# ──────────────────────────────────────────────────────────────
@group_admin.route("/database", methods=["GET", "POST"])
@group_admin_required
def database():
group_id = session["group_id"]
group = db.get_group_by_id(group_id)
has_db = db.has_db_configured(group_id)
error = None
creds = db.get_group_db_creds(group_id)
if request.method == "POST":
host = request.form.get("host", "").strip()
port = request.form.get("port", "3306").strip()
user = request.form.get("user", "").strip()
password = request.form.get("password", "")
database_name = request.form.get("database", "").strip()
# If password left blank and creds already exist, keep the stored password
if not password and creds:
password = creds["password"]
if not all([host, port, user, database_name]):
error = "Host, Port, User and Database name are required."
elif not password:
error = "Password is required."
else:
try:
import pymysql
test_conn = pymysql.connect(
host=host, port=int(port), user=user,
password=password, database=database_name,
connect_timeout=5
)
test_conn.close()
db.set_group_db_creds(group_id, host, int(port), user, password, database_name)
flash("Database connection saved and tested ✓", "success")
return redirect(url_for("group_admin.database"))
except Exception as e:
error = f"Connection test failed: {e}"
return render_template("group_admin/database.html",
group=group, has_db=has_db, creds=creds, error=error)
@group_admin.route("/database/delete", methods=["POST"])
@group_admin_required
def database_delete():
db.delete_group_db_creds(session["group_id"])
flash("Database connection removed.", "success")
return redirect(url_for("group_admin.database"))