231 lines
8.4 KiB
Python
231 lines
8.4 KiB
Python
"""
|
||
MCLogger – Flask Web-Panel
|
||
Multi-Tenant mit Gruppen, Rollen & verschlüsselten DB-Zugangsdaten.
|
||
Coolify-kompatibel: alle Einstellungen via ENV.
|
||
"""
|
||
import os
|
||
import secrets
|
||
from datetime import datetime
|
||
from flask import Flask, abort, render_template, request, session, url_for
|
||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||
from config import Config
|
||
from panel_db import init_databases, get_user_groups, get_group_member
|
||
from roles import can_manage_group
|
||
from limiter import limiter
|
||
|
||
from blueprints.auth import auth
|
||
from blueprints.site_admin import site_admin
|
||
from blueprints.group_admin import group_admin
|
||
from blueprints.panel import panel
|
||
|
||
|
||
def create_app() -> Flask:
|
||
app = Flask(__name__)
|
||
|
||
# ── Datenschutz-Version automatisch aus Template-Hash berechnen ──────────
|
||
# Wenn PRIVACY_POLICY_VERSION nicht per ENV gesetzt ist, wird der SHA-256
|
||
# des Template-Inhalts berechnet und die ersten 6 Zeichen als Version
|
||
# verwendet. Ändert sich der Seiteninhalt, ändert sich der Hash →
|
||
# alle Nutzer müssen beim nächsten Login erneut zustimmen.
|
||
if not os.getenv("PRIVACY_POLICY_VERSION"):
|
||
import hashlib
|
||
_policy_path = os.path.join(app.root_path, "templates", "privacy_policy.html")
|
||
try:
|
||
with open(_policy_path, "rb") as _f:
|
||
Config.PRIVACY_POLICY_VERSION = hashlib.sha256(_f.read()).hexdigest()[:6].upper()
|
||
except OSError:
|
||
pass # Fallback auf den Config-Default
|
||
|
||
app.secret_key = Config.SECRET_KEY
|
||
app.config.update(
|
||
SESSION_COOKIE_HTTPONLY=Config.SESSION_COOKIE_HTTPONLY,
|
||
SESSION_COOKIE_SAMESITE=Config.SESSION_COOKIE_SAMESITE,
|
||
SESSION_COOKIE_SECURE=Config.SESSION_COOKIE_SECURE,
|
||
)
|
||
|
||
Config.validate_security()
|
||
|
||
# Reverse-Proxy: echte Client-IP aus X-Forwarded-For lesen
|
||
if Config.PROXY_COUNT > 0:
|
||
app.wsgi_app = ProxyFix(
|
||
app.wsgi_app,
|
||
x_for=Config.PROXY_COUNT,
|
||
x_proto=Config.PROXY_COUNT,
|
||
x_host=Config.PROXY_COUNT,
|
||
)
|
||
|
||
# Blueprints registrieren
|
||
app.register_blueprint(auth)
|
||
app.register_blueprint(site_admin)
|
||
app.register_blueprint(group_admin)
|
||
app.register_blueprint(panel)
|
||
|
||
# Rate limiter
|
||
limiter.init_app(app)
|
||
|
||
@app.errorhandler(429)
|
||
def rate_limit_exceeded(e):
|
||
retry_after = getattr(e, "retry_after", None)
|
||
return render_template(
|
||
"429.html",
|
||
retry_after=int(retry_after) if retry_after else 60,
|
||
), 429
|
||
|
||
# Panel-Datenbank-Tabellen anlegen
|
||
try:
|
||
init_databases()
|
||
except Exception as e:
|
||
app.logger.warning(f"DB-Initialisierung fehlgeschlagen (noch nicht konfiguriert?): {e}")
|
||
|
||
# ── Template-Filter ───────────────────────────────────────
|
||
|
||
def _get_or_create_csrf_token() -> str:
|
||
token = session.get("_csrf_token")
|
||
if not token:
|
||
token = secrets.token_urlsafe(32)
|
||
session["_csrf_token"] = token
|
||
return token
|
||
|
||
@app.before_request
|
||
def enforce_csrf():
|
||
if request.method not in {"POST", "PUT", "PATCH", "DELETE"}:
|
||
return
|
||
|
||
session_token = session.get("_csrf_token")
|
||
request_token = request.form.get("_csrf_token") or request.headers.get("X-CSRF-Token")
|
||
if not session_token or not request_token or session_token != request_token:
|
||
abort(400)
|
||
|
||
@app.before_request
|
||
def refresh_session_role():
|
||
"""Keeps session role/permissions in sync with the DB.
|
||
Runs on every request so role changes by an admin take effect
|
||
immediately without requiring the affected user to re-login."""
|
||
user_id = session.get("user_id")
|
||
group_id = session.get("group_id")
|
||
# Only for regular panel users (not site-admin-only sessions,
|
||
# not admin-viewing-group sessions, not unauthenticated requests).
|
||
if not user_id or session.get("is_site_admin") or session.get("admin_viewing"):
|
||
return
|
||
if not group_id:
|
||
return
|
||
try:
|
||
member = get_group_member(user_id, group_id)
|
||
if not member:
|
||
# User was removed from the group — clear their group context
|
||
session.pop("group_id", None)
|
||
session.pop("group_name", None)
|
||
session.pop("role", None)
|
||
session.pop("permissions", None)
|
||
return
|
||
import json as _json
|
||
raw = member.get("permissions")
|
||
perms = (
|
||
raw if isinstance(raw, dict)
|
||
else (_json.loads(raw) if isinstance(raw, str) else {})
|
||
)
|
||
session["role"] = member["role"]
|
||
session["permissions"] = perms
|
||
except Exception:
|
||
pass # DB unavailable — keep existing session as-is
|
||
|
||
@app.after_request
|
||
def set_security_headers(resp):
|
||
resp.headers.setdefault("X-Content-Type-Options", "nosniff")
|
||
resp.headers.setdefault("X-Frame-Options", "DENY")
|
||
resp.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
|
||
resp.headers.setdefault("Content-Security-Policy", "default-src 'self'; script-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; style-src 'self' 'unsafe-inline' https://cdn.jsdelivr.net; img-src 'self' data: https://minotar.net; font-src 'self' https://cdn.jsdelivr.net; connect-src 'self'; frame-ancestors 'none';")
|
||
return resp
|
||
|
||
@app.route("/privacy-policy")
|
||
def privacy_policy():
|
||
from config import Config
|
||
return render_template(
|
||
"privacy_policy.html",
|
||
last_updated="April 15, 2026",
|
||
invite_expiry_hours=Config.INVITE_EXPIRY_HOURS,
|
||
audit_retention_days=Config.AUDIT_LOG_RETENTION_DAYS,
|
||
policy_version=Config.PRIVACY_POLICY_VERSION,
|
||
)
|
||
|
||
@app.errorhandler(400)
|
||
def bad_request(_):
|
||
return "Bad request", 400
|
||
|
||
@app.errorhandler(404)
|
||
def not_found(_):
|
||
uid = session.get("user_id")
|
||
is_site_admin = bool(session.get("is_site_admin"))
|
||
role = session.get("role")
|
||
|
||
links = []
|
||
if not uid:
|
||
links = [
|
||
{"label": "Login", "href": url_for("auth.login"), "btn": "btn-success"},
|
||
{"label": "Site Admin Login", "href": url_for("auth.admin_login"), "btn": "btn-outline-danger"},
|
||
]
|
||
elif is_site_admin and not session.get("group_id"):
|
||
links = [
|
||
{"label": "Site Admin Dashboard", "href": url_for("site_admin.dashboard"), "btn": "btn-danger"},
|
||
]
|
||
else:
|
||
links.append({"label": "Panel Dashboard", "href": url_for("panel.dashboard"), "btn": "btn-success"})
|
||
if is_site_admin:
|
||
links.append({"label": "Site Admin", "href": url_for("site_admin.dashboard"), "btn": "btn-outline-danger"})
|
||
if can_manage_group(role) and not is_site_admin:
|
||
links.append({"label": "Group Admin", "href": url_for("group_admin.dashboard"), "btn": "btn-outline-warning"})
|
||
|
||
return render_template(
|
||
"404.html",
|
||
requested_path=request.path,
|
||
request_method=request.method,
|
||
links=links,
|
||
is_logged_in=bool(uid),
|
||
is_site_admin=is_site_admin,
|
||
role=role,
|
||
), 404
|
||
|
||
@app.template_filter("fmt_duration")
|
||
def fmt_duration(seconds):
|
||
if seconds is None:
|
||
return "—"
|
||
seconds = int(seconds)
|
||
h = seconds // 3600
|
||
m = (seconds % 3600) // 60
|
||
s = seconds % 60
|
||
if h: return f"{h}h {m}m"
|
||
elif m: return f"{m}m {s}s"
|
||
return f"{s}s"
|
||
|
||
@app.template_filter("fmt_dt")
|
||
def fmt_dt(dt):
|
||
if dt is None:
|
||
return "—"
|
||
if isinstance(dt, str):
|
||
return dt
|
||
return dt.strftime("%d.%m.%Y %H:%M:%S")
|
||
|
||
@app.context_processor
|
||
def inject_globals():
|
||
uid = session.get("user_id")
|
||
try:
|
||
groups = get_user_groups(uid) if uid else []
|
||
except Exception:
|
||
groups = []
|
||
return {
|
||
"now": datetime.now(),
|
||
"app_version": "2.0.0",
|
||
"author": "SimolZimol",
|
||
"user_groups": groups,
|
||
"csrf_token": _get_or_create_csrf_token,
|
||
}
|
||
|
||
return app
|
||
|
||
|
||
app = create_app()
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host=Config.HOST, port=Config.PORT, debug=Config.DEBUG)
|
||
|