Files
MClogger/web/blueprints/auth.py
simon bdf83bd275 modified: web/app.py
modified:   web/blueprints/auth.py
	modified:   web/blueprints/site_admin.py
	modified:   web/config.py
	modified:   web/panel_db.py
	modified:   web/templates/admin/audit_log.html
	modified:   web/templates/admin/dashboard.html
	new file:   web/templates/auth/consent.html
2026-04-15 11:05:21 +02:00

245 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCLogger Authentifizierung
Getrennte Login-Seiten für Site-Admins und normale Nutzer/Gruppen-Admins.
"""
import json
from datetime import datetime
from flask import Blueprint, render_template, request, redirect, url_for, session, flash
from panel_db import (
accept_group_invite, check_login, get_invite_by_token, get_user_groups,
log_audit_event, get_user_consent_version, set_user_consent,
)
from config import Config
from limiter import limiter
auth = Blueprint("auth", __name__)
# ── DSGVO-Einwilligungs-Check ─────────────────────────────────
# Routen, die ohne Zustimmung erreichbar sein müssen:
_CONSENT_EXEMPT = frozenset({
"auth.consent", "auth.logout", "auth.login", "auth.admin_login",
"auth.accept_invite", "privacy_policy", "static",
})
@auth.before_app_request
def require_consent():
"""Leitet angemeldete Nutzer auf die Zustimmungsseite, solange sie der
aktuellen Datenschutzerklärung noch nicht zugestimmt haben."""
if request.endpoint in _CONSENT_EXEMPT:
return
user_id = session.get("user_id")
if not user_id:
return
# Site-Admins sind ebenfalls einwilligungspflichtig
if session.get("needs_consent"):
return redirect(url_for("auth.consent"))
@auth.route("/consent", methods=["GET", "POST"])
def consent():
user_id = session.get("user_id")
if not user_id:
return redirect(url_for("auth.login"))
if request.method == "POST":
action = request.form.get("action")
if action == "accept":
set_user_consent(user_id, Config.PRIVACY_POLICY_VERSION)
log_audit_event(
user_id, session.get("username"), "consent.given",
details={"policy_version": Config.PRIVACY_POLICY_VERSION},
ip_address=request.remote_addr,
)
session.pop("needs_consent", None)
# Nach Zustimmung weiterleiten
if session.get("is_site_admin"):
return redirect(url_for("site_admin.dashboard"))
return redirect(url_for("panel.dashboard"))
else:
# Ablehnen → ausloggen
log_audit_event(
user_id, session.get("username"), "consent.declined",
details={"policy_version": Config.PRIVACY_POLICY_VERSION},
ip_address=request.remote_addr,
)
session.clear()
flash("You must accept the Privacy Policy to use this service.", "warning")
return redirect(url_for("auth.login"))
return render_template(
"auth/consent.html",
policy_version=Config.PRIVACY_POLICY_VERSION,
)
@auth.route("/login", methods=["GET", "POST"])
@limiter.limit("15 per minute", methods=["POST"])
def login():
if session.get("user_id"):
return redirect(url_for("panel.dashboard"))
error = None
if request.method == "POST":
username = request.form.get("username", "")
user = check_login(username, request.form.get("password", ""))
if user and user["is_site_admin"]:
flash("Please use the Site Admin login.", "warning")
return redirect(url_for("auth.admin_login"))
if user:
groups = get_user_groups(user["id"])
if not groups:
error = "You are not assigned to any group. Please contact an admin."
else:
_set_user_session(user, groups)
log_audit_event(
user["id"], user["username"], "user.login",
entity_type="user", entity_id=user["id"],
ip_address=request.remote_addr,
)
# DSGVO: Zustimmung prüfen
if get_user_consent_version(user["id"]) != Config.PRIVACY_POLICY_VERSION:
session["needs_consent"] = True
return redirect(url_for("auth.consent"))
return redirect(url_for("panel.dashboard"))
else:
log_audit_event(
None, None, "user.login_failed",
details={"username": username},
ip_address=request.remote_addr,
)
error = "Incorrect username or password."
return render_template("auth/login.html", error=error)
@auth.route("/admin/login", methods=["GET", "POST"])
@limiter.limit("10 per minute", methods=["POST"])
def admin_login():
if session.get("is_site_admin"):
return redirect(url_for("site_admin.dashboard"))
error = None
if request.method == "POST":
username = request.form.get("username", "")
user = check_login(username, request.form.get("password", ""))
if user and user["is_site_admin"]:
session["user_id"] = user["id"]
session["username"] = user["username"]
session["is_site_admin"] = True
session["group_id"] = None
session["permissions"] = {}
log_audit_event(
user["id"], user["username"], "admin.login",
entity_type="user", entity_id=user["id"],
ip_address=request.remote_addr,
)
# DSGVO: Zustimmung prüfen
if get_user_consent_version(user["id"]) != Config.PRIVACY_POLICY_VERSION:
session["needs_consent"] = True
return redirect(url_for("auth.consent"))
return redirect(url_for("site_admin.dashboard"))
elif user:
log_audit_event(
user["id"], user["username"], "admin.login_failed",
details={"reason": "no_admin_privileges"},
ip_address=request.remote_addr,
)
error = "No Site Admin privileges."
else:
log_audit_event(
None, None, "admin.login_failed",
details={"username": username},
ip_address=request.remote_addr,
)
error = "Incorrect username or password."
return render_template("auth/admin_login.html", error=error)
@auth.route("/logout", methods=["POST"])
def logout():
user_id = session.get("user_id")
username = session.get("username")
if user_id:
log_audit_event(user_id, username, "session.logout", ip_address=request.remote_addr)
session.clear()
return redirect(url_for("auth.login"))
@auth.route("/switch-group/<int:group_id>", methods=["POST"])
def switch_group(group_id):
if not session.get("user_id") or session.get("is_site_admin"):
return redirect(url_for("auth.login"))
user_id = session["user_id"]
groups = get_user_groups(user_id)
target = next((g for g in groups if g["id"] == group_id), None)
if not target:
flash("Group not found or no access.", "danger")
return redirect(url_for("panel.dashboard"))
_apply_group(target)
return redirect(url_for("panel.dashboard"))
@auth.route("/invite/<token>", methods=["GET", "POST"])
@limiter.limit("20 per minute", methods=["POST"])
def accept_invite(token):
if session.get("user_id"):
return redirect(url_for("panel.dashboard"))
invite = get_invite_by_token(token)
if not invite:
flash("Invitation not found.", "danger")
return redirect(url_for("auth.login"))
is_expired = invite["expires_at"] <= datetime.utcnow()
is_invalid = bool(invite.get("accepted_at") or invite.get("revoked_at") or is_expired)
error = None
if request.method == "POST" and not is_invalid:
password = request.form.get("password", "")
confirm_password = request.form.get("confirm_password", "")
if len(password) < 8:
error = "Password must be at least 8 characters long."
elif password != confirm_password:
error = "Passwords do not match."
else:
result = accept_group_invite(token, password)
if result is None:
flash("Invitation is no longer valid.", "danger")
return redirect(url_for("auth.login"))
if result.get("error") == "username_or_email_taken":
error = "The invited username or email is already in use. Please contact your administrator."
else:
log_audit_event(
result.get("user_id"), invite["invited_username"],
"invite.accepted",
entity_type="invite", entity_id=invite["id"],
details={"group_id": invite.get("group_id"), "role": invite.get("role")},
group_id=invite.get("group_id"),
ip_address=request.remote_addr,
)
flash("Your account has been created. You can now sign in.", "success")
return redirect(url_for("auth.login"))
return render_template("auth/accept_invite.html", invite=invite, is_invalid=is_invalid, is_expired=is_expired, error=error)
def _set_user_session(user, groups):
session["user_id"] = user["id"]
session["username"] = user["username"]
session["is_site_admin"] = False
_apply_group(groups[0]) # Erste Gruppe als Standard
def _apply_group(group):
raw = group.get("permissions")
if isinstance(raw, str):
perms = json.loads(raw)
elif isinstance(raw, dict):
perms = raw
else:
perms = {}
session["group_id"] = group["id"]
session["group_name"] = group["name"]
session["role"] = group.get("role", "viewer")
session["permissions"] = perms