""" MCLogger – Flask Web-Panel Multi-Tenant mit Gruppen, Rollen & verschlüsselten DB-Zugangsdaten. Coolify-kompatibel: alle Einstellungen via ENV. """ import secrets from datetime import datetime from flask import Flask, abort, render_template, request, session, url_for from werkzeug.middleware.proxy_fix import ProxyFix from config import Config from panel_db import init_databases, get_user_groups, get_group_member from roles import can_manage_group from limiter import limiter from blueprints.auth import auth from blueprints.site_admin import site_admin from blueprints.group_admin import group_admin from blueprints.panel import panel def create_app() -> Flask: app = Flask(__name__) app.secret_key = Config.SECRET_KEY app.config.update( SESSION_COOKIE_HTTPONLY=Config.SESSION_COOKIE_HTTPONLY, SESSION_COOKIE_SAMESITE=Config.SESSION_COOKIE_SAMESITE, SESSION_COOKIE_SECURE=Config.SESSION_COOKIE_SECURE, ) Config.validate_security() # Reverse-Proxy: echte Client-IP aus X-Forwarded-For lesen if Config.PROXY_COUNT > 0: app.wsgi_app = ProxyFix( app.wsgi_app, x_for=Config.PROXY_COUNT, x_proto=Config.PROXY_COUNT, x_host=Config.PROXY_COUNT, ) # Blueprints registrieren app.register_blueprint(auth) app.register_blueprint(site_admin) app.register_blueprint(group_admin) app.register_blueprint(panel) # Rate limiter limiter.init_app(app) @app.errorhandler(429) def rate_limit_exceeded(e): retry_after = getattr(e, "retry_after", None) return render_template( "429.html", retry_after=int(retry_after) if retry_after else 60, ), 429 # Panel-Datenbank-Tabellen anlegen try: init_databases() except Exception as e: app.logger.warning(f"DB-Initialisierung fehlgeschlagen (noch nicht konfiguriert?): {e}") # ── Template-Filter ─────────────────────────────────────── def _get_or_create_csrf_token() -> str: token = session.get("_csrf_token") if not token: token = secrets.token_urlsafe(32) session["_csrf_token"] = token return token @app.before_request def enforce_csrf(): if request.method not in {"POST", "PUT", "PATCH", "DELETE"}: return session_token = session.get("_csrf_token") request_token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token") if not session_token or not request_token or session_token != request_token: abort(400) @app.before_request def refresh_session_role(): """Keeps session role/permissions in sync with the DB. Runs on every request so role changes by an admin take effect immediately without requiring the affected user to re-login.""" user_id = session.get("user_id") group_id = session.get("group_id") # Only for regular panel users (not site-admin-only sessions, # not admin-viewing-group sessions, not unauthenticated requests). if not user_id or session.get("is_site_admin") or session.get("admin_viewing"): return if not group_id: return try: member = get_group_member(user_id, group_id) if not member: # User was removed from the group — clear their group context session.pop("group_id", None) session.pop("group_name", None) session.pop("role", None) session.pop("permissions", None) return import json as _json raw = member.get("permissions") perms = ( raw if isinstance(raw, dict) else (_json.loads(raw) if isinstance(raw, str) else {}) ) session["role"] = member["role"] session["permissions"] = perms except Exception: pass # DB unavailable — keep existing session as-is @app.after_request def set_security_headers(resp): resp.headers.setdefault("X-Content-Type-Options", "nosniff") resp.headers.setdefault("X-Frame-Options", "DENY") resp.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin") resp.headers.setdefault("Content-Security-Policy", "default-src 'self'; script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; img-src 'self' data:; font-src 'self' https://cdn.jsdelivr.net; connect-src 'self'; frame-ancestors 'none';") return resp @app.route("/privacy-policy") def privacy_policy(): from config import Config return render_template( "privacy_policy.html", last_updated="April 14, 2026", invite_expiry_hours=Config.INVITE_EXPIRY_HOURS, ) @app.errorhandler(400) def bad_request(_): return "Bad request", 400 @app.errorhandler(404) def not_found(_): uid = session.get("user_id") is_site_admin = bool(session.get("is_site_admin")) role = session.get("role") links = [] if not uid: links = [ {"label": "Login", "href": url_for("auth.login"), "btn": "btn-success"}, {"label": "Site Admin Login", "href": url_for("auth.admin_login"), "btn": "btn-outline-danger"}, ] elif is_site_admin and not session.get("group_id"): links = [ {"label": "Site Admin Dashboard", "href": url_for("site_admin.dashboard"), "btn": "btn-danger"}, ] else: links.append({"label": "Panel Dashboard", "href": url_for("panel.dashboard"), "btn": "btn-success"}) if is_site_admin: links.append({"label": "Site Admin", "href": url_for("site_admin.dashboard"), "btn": "btn-outline-danger"}) if can_manage_group(role) and not is_site_admin: links.append({"label": "Group Admin", "href": url_for("group_admin.dashboard"), "btn": "btn-outline-warning"}) return render_template( "404.html", requested_path=request.path, request_method=request.method, links=links, is_logged_in=bool(uid), is_site_admin=is_site_admin, role=role, ), 404 @app.template_filter("fmt_duration") def fmt_duration(seconds): if seconds is None: return "—" seconds = int(seconds) h = seconds // 3600 m = (seconds % 3600) // 60 s = seconds % 60 if h: return f"{h}h {m}m" elif m: return f"{m}m {s}s" return f"{s}s" @app.template_filter("fmt_dt") def fmt_dt(dt): if dt is None: return "—" if isinstance(dt, str): return dt return dt.strftime("%d.%m.%Y %H:%M:%S") @app.context_processor def inject_globals(): uid = session.get("user_id") try: groups = get_user_groups(uid) if uid else [] except Exception: groups = [] return { "now": datetime.now(), "app_version": "2.0.0", "author": "SimolZimol", "user_groups": groups, "csrf_token": _get_or_create_csrf_token, } return app app = create_app() if __name__ == "__main__": app.run(host=Config.HOST, port=Config.PORT, debug=Config.DEBUG)