""" MCLogger – Authentifizierung Getrennte Login-Seiten für Site-Admins und normale Nutzer/Gruppen-Admins. """ import json from datetime import datetime from flask import Blueprint, render_template, request, redirect, url_for, session, flash from panel_db import ( accept_group_invite, check_login, get_invite_by_token, get_user_groups, log_audit_event, get_user_consent_version, set_user_consent, ) from config import Config from limiter import limiter auth = Blueprint("auth", __name__) # ── DSGVO-Einwilligungs-Check ───────────────────────────────── # Routen, die ohne Zustimmung erreichbar sein müssen: _CONSENT_EXEMPT = frozenset({ "auth.consent", "auth.logout", "auth.login", "auth.admin_login", "auth.accept_invite", "privacy_policy", "static", }) @auth.before_app_request def require_consent(): """Leitet angemeldete Nutzer auf die Zustimmungsseite, solange sie der aktuellen Datenschutzerklärung noch nicht zugestimmt haben.""" if request.endpoint in _CONSENT_EXEMPT: return user_id = session.get("user_id") if not user_id: return # Site-Admins sind ebenfalls einwilligungspflichtig if session.get("needs_consent"): return redirect(url_for("auth.consent")) @auth.route("/consent", methods=["GET", "POST"]) def consent(): user_id = session.get("user_id") if not user_id: return redirect(url_for("auth.login")) if request.method == "POST": action = request.form.get("action") if action == "accept": set_user_consent(user_id, Config.PRIVACY_POLICY_VERSION) log_audit_event( user_id, session.get("username"), "consent.given", details={"policy_version": Config.PRIVACY_POLICY_VERSION}, ip_address=request.remote_addr, ) session.pop("needs_consent", None) # Nach Zustimmung weiterleiten if session.get("is_site_admin"): return redirect(url_for("site_admin.dashboard")) return redirect(url_for("panel.dashboard")) else: # Ablehnen → ausloggen log_audit_event( user_id, session.get("username"), "consent.declined", details={"policy_version": Config.PRIVACY_POLICY_VERSION}, ip_address=request.remote_addr, ) session.clear() flash("You must accept the Privacy Policy to use this service.", "warning") return redirect(url_for("auth.login")) return render_template( "auth/consent.html", policy_version=Config.PRIVACY_POLICY_VERSION, ) @auth.route("/login", methods=["GET", "POST"]) @limiter.limit("15 per minute", methods=["POST"]) def login(): if session.get("user_id"): return redirect(url_for("panel.dashboard")) error = None if request.method == "POST": username = request.form.get("username", "") user = check_login(username, request.form.get("password", "")) if user and user["is_site_admin"]: flash("Please use the Site Admin login.", "warning") return redirect(url_for("auth.admin_login")) if user: groups = get_user_groups(user["id"]) if not groups: error = "You are not assigned to any group. Please contact an admin." else: _set_user_session(user, groups) log_audit_event( user["id"], user["username"], "user.login", entity_type="user", entity_id=user["id"], ip_address=request.remote_addr, ) # DSGVO: Zustimmung prüfen if get_user_consent_version(user["id"]) != Config.PRIVACY_POLICY_VERSION: session["needs_consent"] = True return redirect(url_for("auth.consent")) return redirect(url_for("panel.dashboard")) else: log_audit_event( None, None, "user.login_failed", details={"username": username}, ip_address=request.remote_addr, ) error = "Incorrect username or password." return render_template("auth/login.html", error=error) @auth.route("/admin/login", methods=["GET", "POST"]) @limiter.limit("10 per minute", methods=["POST"]) def admin_login(): if session.get("is_site_admin"): return redirect(url_for("site_admin.dashboard")) error = None if request.method == "POST": username = request.form.get("username", "") user = check_login(username, request.form.get("password", "")) if user and user["is_site_admin"]: session["user_id"] = user["id"] session["username"] = user["username"] session["is_site_admin"] = True session["group_id"] = None session["permissions"] = {} log_audit_event( user["id"], user["username"], "admin.login", entity_type="user", entity_id=user["id"], ip_address=request.remote_addr, ) # DSGVO: Zustimmung prüfen if get_user_consent_version(user["id"]) != Config.PRIVACY_POLICY_VERSION: session["needs_consent"] = True return redirect(url_for("auth.consent")) return redirect(url_for("site_admin.dashboard")) elif user: log_audit_event( user["id"], user["username"], "admin.login_failed", details={"reason": "no_admin_privileges"}, ip_address=request.remote_addr, ) error = "No Site Admin privileges." else: log_audit_event( None, None, "admin.login_failed", details={"username": username}, ip_address=request.remote_addr, ) error = "Incorrect username or password." return render_template("auth/admin_login.html", error=error) @auth.route("/logout", methods=["POST"]) def logout(): user_id = session.get("user_id") username = session.get("username") if user_id: log_audit_event(user_id, username, "session.logout", ip_address=request.remote_addr) session.clear() return redirect(url_for("auth.login")) @auth.route("/switch-group/", methods=["POST"]) def switch_group(group_id): if not session.get("user_id") or session.get("is_site_admin"): return redirect(url_for("auth.login")) user_id = session["user_id"] groups = get_user_groups(user_id) target = next((g for g in groups if g["id"] == group_id), None) if not target: flash("Group not found or no access.", "danger") return redirect(url_for("panel.dashboard")) _apply_group(target) return redirect(url_for("panel.dashboard")) @auth.route("/invite/", methods=["GET", "POST"]) @limiter.limit("20 per minute", methods=["POST"]) def accept_invite(token): if session.get("user_id"): return redirect(url_for("panel.dashboard")) invite = get_invite_by_token(token) if not invite: flash("Invitation not found.", "danger") return redirect(url_for("auth.login")) is_expired = invite["expires_at"] <= datetime.utcnow() is_invalid = bool(invite.get("accepted_at") or invite.get("revoked_at") or is_expired) error = None if request.method == "POST" and not is_invalid: password = request.form.get("password", "") confirm_password = request.form.get("confirm_password", "") if len(password) < 8: error = "Password must be at least 8 characters long." elif password != confirm_password: error = "Passwords do not match." else: result = accept_group_invite(token, password) if result is None: flash("Invitation is no longer valid.", "danger") return redirect(url_for("auth.login")) if result.get("error") == "username_or_email_taken": error = "The invited username or email is already in use. Please contact your administrator." else: log_audit_event( result.get("user_id"), invite["invited_username"], "invite.accepted", entity_type="invite", entity_id=invite["id"], details={"group_id": invite.get("group_id"), "role": invite.get("role")}, group_id=invite.get("group_id"), ip_address=request.remote_addr, ) flash("Your account has been created. You can now sign in.", "success") return redirect(url_for("auth.login")) return render_template("auth/accept_invite.html", invite=invite, is_invalid=is_invalid, is_expired=is_expired, error=error) def _set_user_session(user, groups): session["user_id"] = user["id"] session["username"] = user["username"] session["is_site_admin"] = False _apply_group(groups[0]) # Erste Gruppe als Standard def _apply_group(group): raw = group.get("permissions") if isinstance(raw, str): perms = json.loads(raw) elif isinstance(raw, dict): perms = raw else: perms = {} session["group_id"] = group["id"] session["group_name"] = group["name"] session["role"] = group.get("role", "viewer") session["permissions"] = perms