Files
MClogger/web/panel_db.py
SimolZimol a2db1585de modified: web/blueprints/site_admin.py
modified:   web/panel_db.py
	modified:   web/templates/admin/user_edit.html
2026-04-01 03:04:38 +02:00

354 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCLogger Panel-Datenbank-Operationen
Verwaltet Nutzer, Gruppen, Mitgliedschaften (PANEL_DB)
und verschlüsselte MC-DB-Zugangsdaten (CREDS_DB).
"""
import json
import pymysql
import pymysql.cursors
from config import Config
from crypto import generate_salt, hash_password, verify_password, encrypt_str, decrypt_str
# ─────────────────────────────────────────────────────────────
# Datenbankverbindungen
# ─────────────────────────────────────────────────────────────
def get_panel_db():
return pymysql.connect(
host=Config.PANEL_DB_HOST,
port=Config.PANEL_DB_PORT,
user=Config.PANEL_DB_USER,
password=Config.PANEL_DB_PASSWORD,
database=Config.PANEL_DB_NAME,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
def get_creds_db():
return pymysql.connect(
host=Config.CREDS_DB_HOST,
port=Config.CREDS_DB_PORT,
user=Config.CREDS_DB_USER,
password=Config.CREDS_DB_PASSWORD,
database=Config.CREDS_DB_NAME,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
def _panel_query(sql, args=None, fetchone=False, write=False):
conn = get_panel_db()
try:
with conn.cursor() as cur:
cur.execute(sql, args or ())
if write:
return cur.lastrowid
return cur.fetchone() if fetchone else cur.fetchall()
finally:
conn.close()
def _creds_query(sql, args=None, fetchone=False, write=False):
conn = get_creds_db()
try:
with conn.cursor() as cur:
cur.execute(sql, args or ())
if write:
return cur.lastrowid
return cur.fetchone() if fetchone else cur.fetchall()
finally:
conn.close()
# ─────────────────────────────────────────────────────────────
# Initialisierung Tabellen anlegen
# ─────────────────────────────────────────────────────────────
PANEL_SCHEMA = [
"""CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
salt VARCHAR(64) NOT NULL,
is_site_admin TINYINT(1) DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS user_groups (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS group_members (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
group_id INT NOT NULL,
role ENUM('admin','member') DEFAULT 'member',
permissions JSON,
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uq_user_group (user_id, group_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (group_id) REFERENCES user_groups(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
]
CREDS_SCHEMA = [
"""CREATE TABLE IF NOT EXISTS group_databases (
id INT AUTO_INCREMENT PRIMARY KEY,
group_id INT UNIQUE NOT NULL,
enc_host TEXT NOT NULL,
enc_port TEXT NOT NULL,
enc_user TEXT NOT NULL,
enc_password TEXT NOT NULL,
enc_database TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
]
def init_databases():
"""Erstellt alle benötigten Tabellen falls nicht vorhanden."""
panel = get_panel_db()
try:
with panel.cursor() as cur:
for stmt in PANEL_SCHEMA:
cur.execute(stmt)
finally:
panel.close()
creds = get_creds_db()
try:
with creds.cursor() as cur:
for stmt in CREDS_SCHEMA:
cur.execute(stmt)
finally:
creds.close()
# ─────────────────────────────────────────────────────────────
# Nutzer
# ─────────────────────────────────────────────────────────────
def create_user(username: str, email: str, password: str, is_site_admin: bool = False) -> int:
salt = generate_salt()
pw_hash = hash_password(password, salt)
return _panel_query(
"INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)",
(username, email, pw_hash, salt, int(is_site_admin)), write=True
)
def get_user_by_username(username: str):
return _panel_query("SELECT * FROM users WHERE username=%s", (username,), fetchone=True)
def get_user_by_email(email: str):
return _panel_query("SELECT * FROM users WHERE email=%s", (email,), fetchone=True)
def get_user_by_id(user_id: int):
return _panel_query("SELECT * FROM users WHERE id=%s", (user_id,), fetchone=True)
def update_user(user_id: int, username: str, email: str, is_site_admin: bool):
_panel_query(
"UPDATE users SET username=%s, email=%s, is_site_admin=%s WHERE id=%s",
(username, email, int(is_site_admin), user_id), write=True
)
def change_password(user_id: int, new_password: str):
salt = generate_salt()
pw_hash = hash_password(new_password, salt)
_panel_query(
"UPDATE users SET password_hash=%s, salt=%s WHERE id=%s",
(pw_hash, salt, user_id), write=True
)
def delete_user(user_id: int):
_panel_query("DELETE FROM users WHERE id=%s", (user_id,), write=True)
def check_login(username: str, password: str):
"""Prüft Anmeldedaten. Gibt den Nutzer zurück oder None."""
user = get_user_by_username(username)
if not user:
return None
if not verify_password(password, user["salt"], user["password_hash"]):
return None
_panel_query("UPDATE users SET last_login=NOW() WHERE id=%s", (user["id"],), write=True)
return user
def list_all_users():
return _panel_query(
"SELECT u.*, COUNT(gm.group_id) AS group_count "
"FROM users u LEFT JOIN group_members gm ON gm.user_id=u.id "
"GROUP BY u.id ORDER BY u.username"
)
# ─────────────────────────────────────────────────────────────
# Gruppen
# ─────────────────────────────────────────────────────────────
def create_group(name: str, description: str = "") -> int:
return _panel_query(
"INSERT INTO user_groups (name, description) VALUES (%s,%s)",
(name, description), write=True
)
def get_group_by_id(group_id: int):
return _panel_query("SELECT * FROM user_groups WHERE id=%s", (group_id,), fetchone=True)
def get_group_by_name(name: str):
return _panel_query("SELECT * FROM user_groups WHERE name=%s", (name,), fetchone=True)
def update_group(group_id: int, name: str, description: str):
_panel_query(
"UPDATE user_groups SET name=%s, description=%s WHERE id=%s",
(name, description, group_id), write=True
)
def delete_group(group_id: int):
_panel_query("DELETE FROM user_groups WHERE id=%s", (group_id,), write=True)
def list_all_groups():
return _panel_query(
"SELECT g.*, COUNT(gm.user_id) AS member_count "
"FROM user_groups g LEFT JOIN group_members gm ON gm.group_id=g.id "
"GROUP BY g.id ORDER BY g.name"
)
# ─────────────────────────────────────────────────────────────
# Gruppenmitgliedschaften
# ─────────────────────────────────────────────────────────────
def get_user_groups(user_id: int):
return _panel_query(
"SELECT g.*, gm.role, gm.permissions "
"FROM user_groups g "
"JOIN group_members gm ON gm.group_id=g.id "
"WHERE gm.user_id=%s ORDER BY g.name",
(user_id,)
)
def get_group_member(user_id: int, group_id: int):
return _panel_query(
"SELECT * FROM group_members WHERE user_id=%s AND group_id=%s",
(user_id, group_id), fetchone=True
)
def get_group_members(group_id: int):
return _panel_query(
"SELECT u.id, u.username, u.email, u.last_login, gm.role, gm.permissions, gm.joined_at "
"FROM group_members gm "
"JOIN users u ON u.id=gm.user_id "
"WHERE gm.group_id=%s ORDER BY gm.role DESC, u.username",
(group_id,)
)
def add_group_member(user_id: int, group_id: int, role: str = "member", permissions: dict = None):
if permissions is None:
permissions = Config.DEFAULT_PERMISSIONS
_panel_query(
"INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s) "
"ON DUPLICATE KEY UPDATE role=VALUES(role), permissions=VALUES(permissions)",
(user_id, group_id, role, json.dumps(permissions)), write=True
)
def update_member(user_id: int, group_id: int, role: str, permissions: dict):
_panel_query(
"UPDATE group_members SET role=%s, permissions=%s WHERE user_id=%s AND group_id=%s",
(role, json.dumps(permissions), user_id, group_id), write=True
)
def remove_group_member(user_id: int, group_id: int):
_panel_query(
"DELETE FROM group_members WHERE user_id=%s AND group_id=%s",
(user_id, group_id), write=True
)
def get_permissions(user_id: int, group_id: int) -> dict:
"""Gibt die Berechtigungen des Nutzers in der Gruppe zurück."""
member = get_group_member(user_id, group_id)
if not member:
return {}
raw = member.get("permissions")
if isinstance(raw, str):
return json.loads(raw)
if isinstance(raw, dict):
return raw
return {}
# ─────────────────────────────────────────────────────────────
# MC-Datenbank-Zugangsdaten (verschlüsselt)
# ─────────────────────────────────────────────────────────────
def set_group_db_creds(group_id: int, host: str, port: int, user: str, password: str, database: str):
"""Verschlüsselt und speichert die MC-DB-Zugangsdaten einer Gruppe."""
_creds_query(
"INSERT INTO group_databases (group_id, enc_host, enc_port, enc_user, enc_password, enc_database) "
"VALUES (%s,%s,%s,%s,%s,%s) "
"ON DUPLICATE KEY UPDATE enc_host=VALUES(enc_host), enc_port=VALUES(enc_port), "
"enc_user=VALUES(enc_user), enc_password=VALUES(enc_password), enc_database=VALUES(enc_database)",
(group_id,
encrypt_str(host),
encrypt_str(str(port)),
encrypt_str(user),
encrypt_str(password),
encrypt_str(database)),
write=True
)
def get_group_db_creds(group_id: int) -> dict | None:
"""Gibt die entschlüsselten MC-DB-Zugangsdaten zurück oder None."""
row = _creds_query(
"SELECT * FROM group_databases WHERE group_id=%s",
(group_id,), fetchone=True
)
if not row:
return None
return {
"host": decrypt_str(row["enc_host"]),
"port": int(decrypt_str(row["enc_port"])),
"user": decrypt_str(row["enc_user"]),
"password": decrypt_str(row["enc_password"]),
"database": decrypt_str(row["enc_database"]),
}
def delete_group_db_creds(group_id: int):
_creds_query("DELETE FROM group_databases WHERE group_id=%s", (group_id,), write=True)
def has_db_configured(group_id: int) -> bool:
row = _creds_query(
"SELECT id FROM group_databases WHERE group_id=%s",
(group_id,), fetchone=True
)
return row is not None