""" MCLogger – Panel-Datenbank-Operationen Verwaltet Nutzer, Gruppen, Mitgliedschaften (PANEL_DB) und verschlüsselte MC-DB-Zugangsdaten (CREDS_DB). """ import json import pymysql import pymysql.cursors from config import Config from crypto import generate_salt, hash_password, verify_password, encrypt_str, decrypt_str # ───────────────────────────────────────────────────────────── # Datenbankverbindungen # ───────────────────────────────────────────────────────────── def get_panel_db(): return pymysql.connect( host=Config.PANEL_DB_HOST, port=Config.PANEL_DB_PORT, user=Config.PANEL_DB_USER, password=Config.PANEL_DB_PASSWORD, database=Config.PANEL_DB_NAME, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, autocommit=True, ) def get_creds_db(): return pymysql.connect( host=Config.CREDS_DB_HOST, port=Config.CREDS_DB_PORT, user=Config.CREDS_DB_USER, password=Config.CREDS_DB_PASSWORD, database=Config.CREDS_DB_NAME, charset="utf8mb4", cursorclass=pymysql.cursors.DictCursor, autocommit=True, ) def _panel_query(sql, args=None, fetchone=False, write=False): conn = get_panel_db() try: with conn.cursor() as cur: cur.execute(sql, args or ()) if write: return cur.lastrowid return cur.fetchone() if fetchone else cur.fetchall() finally: conn.close() def _creds_query(sql, args=None, fetchone=False, write=False): conn = get_creds_db() try: with conn.cursor() as cur: cur.execute(sql, args or ()) if write: return cur.lastrowid return cur.fetchone() if fetchone else cur.fetchall() finally: conn.close() # ───────────────────────────────────────────────────────────── # Initialisierung – Tabellen anlegen # ───────────────────────────────────────────────────────────── PANEL_SCHEMA = [ """CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, email VARCHAR(255) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, salt VARCHAR(64) NOT NULL, is_site_admin TINYINT(1) DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_login TIMESTAMP NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""", """CREATE TABLE IF NOT EXISTS user_groups ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) UNIQUE NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""", """CREATE TABLE IF NOT EXISTS group_members ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, group_id INT NOT NULL, role ENUM('admin','member') DEFAULT 'member', permissions JSON, joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uq_user_group (user_id, group_id), FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY (group_id) REFERENCES user_groups(id) ON DELETE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""", ] CREDS_SCHEMA = [ """CREATE TABLE IF NOT EXISTS group_databases ( id INT AUTO_INCREMENT PRIMARY KEY, group_id INT UNIQUE NOT NULL, enc_host TEXT NOT NULL, enc_port TEXT NOT NULL, enc_user TEXT NOT NULL, enc_password TEXT NOT NULL, enc_database TEXT NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""", ] def init_databases(): """Erstellt alle benötigten Tabellen falls nicht vorhanden.""" panel = get_panel_db() try: with panel.cursor() as cur: for stmt in PANEL_SCHEMA: cur.execute(stmt) finally: panel.close() creds = get_creds_db() try: with creds.cursor() as cur: for stmt in CREDS_SCHEMA: cur.execute(stmt) finally: creds.close() # ───────────────────────────────────────────────────────────── # Nutzer # ───────────────────────────────────────────────────────────── def create_user(username: str, email: str, password: str, is_site_admin: bool = False) -> int: salt = generate_salt() pw_hash = hash_password(password, salt) return _panel_query( "INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)", (username, email, pw_hash, salt, int(is_site_admin)), write=True ) def get_user_by_username(username: str): return _panel_query("SELECT * FROM users WHERE username=%s", (username,), fetchone=True) def get_user_by_id(user_id: int): return _panel_query("SELECT * FROM users WHERE id=%s", (user_id,), fetchone=True) def update_user(user_id: int, username: str, email: str, is_site_admin: bool): _panel_query( "UPDATE users SET username=%s, email=%s, is_site_admin=%s WHERE id=%s", (username, email, int(is_site_admin), user_id), write=True ) def change_password(user_id: int, new_password: str): salt = generate_salt() pw_hash = hash_password(new_password, salt) _panel_query( "UPDATE users SET password_hash=%s, salt=%s WHERE id=%s", (pw_hash, salt, user_id), write=True ) def delete_user(user_id: int): _panel_query("DELETE FROM users WHERE id=%s", (user_id,), write=True) def check_login(username: str, password: str): """Prüft Anmeldedaten. Gibt den Nutzer zurück oder None.""" user = get_user_by_username(username) if not user: return None if not verify_password(password, user["salt"], user["password_hash"]): return None _panel_query("UPDATE users SET last_login=NOW() WHERE id=%s", (user["id"],), write=True) return user def list_all_users(): return _panel_query( "SELECT u.*, COUNT(gm.group_id) AS group_count " "FROM users u LEFT JOIN group_members gm ON gm.user_id=u.id " "GROUP BY u.id ORDER BY u.username" ) # ───────────────────────────────────────────────────────────── # Gruppen # ───────────────────────────────────────────────────────────── def create_group(name: str, description: str = "") -> int: return _panel_query( "INSERT INTO user_groups (name, description) VALUES (%s,%s)", (name, description), write=True ) def get_group_by_id(group_id: int): return _panel_query("SELECT * FROM user_groups WHERE id=%s", (group_id,), fetchone=True) def get_group_by_name(name: str): return _panel_query("SELECT * FROM user_groups WHERE name=%s", (name,), fetchone=True) def update_group(group_id: int, name: str, description: str): _panel_query( "UPDATE user_groups SET name=%s, description=%s WHERE id=%s", (name, description, group_id), write=True ) def delete_group(group_id: int): _panel_query("DELETE FROM user_groups WHERE id=%s", (group_id,), write=True) def list_all_groups(): return _panel_query( "SELECT g.*, COUNT(gm.user_id) AS member_count " "FROM user_groups g LEFT JOIN group_members gm ON gm.group_id=g.id " "GROUP BY g.id ORDER BY g.name" ) # ───────────────────────────────────────────────────────────── # Gruppenmitgliedschaften # ───────────────────────────────────────────────────────────── def get_user_groups(user_id: int): return _panel_query( "SELECT g.*, gm.role, gm.permissions " "FROM user_groups g " "JOIN group_members gm ON gm.group_id=g.id " "WHERE gm.user_id=%s ORDER BY g.name", (user_id,) ) def get_group_member(user_id: int, group_id: int): return _panel_query( "SELECT * FROM group_members WHERE user_id=%s AND group_id=%s", (user_id, group_id), fetchone=True ) def get_group_members(group_id: int): return _panel_query( "SELECT u.id, u.username, u.email, u.last_login, gm.role, gm.permissions, gm.joined_at " "FROM group_members gm " "JOIN users u ON u.id=gm.user_id " "WHERE gm.group_id=%s ORDER BY gm.role DESC, u.username", (group_id,) ) def add_group_member(user_id: int, group_id: int, role: str = "member", permissions: dict = None): if permissions is None: permissions = Config.DEFAULT_PERMISSIONS _panel_query( "INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s) " "ON DUPLICATE KEY UPDATE role=VALUES(role), permissions=VALUES(permissions)", (user_id, group_id, role, json.dumps(permissions)), write=True ) def update_member(user_id: int, group_id: int, role: str, permissions: dict): _panel_query( "UPDATE group_members SET role=%s, permissions=%s WHERE user_id=%s AND group_id=%s", (role, json.dumps(permissions), user_id, group_id), write=True ) def remove_group_member(user_id: int, group_id: int): _panel_query( "DELETE FROM group_members WHERE user_id=%s AND group_id=%s", (user_id, group_id), write=True ) def get_permissions(user_id: int, group_id: int) -> dict: """Gibt die Berechtigungen des Nutzers in der Gruppe zurück.""" member = get_group_member(user_id, group_id) if not member: return {} raw = member.get("permissions") if isinstance(raw, str): return json.loads(raw) if isinstance(raw, dict): return raw return {} # ───────────────────────────────────────────────────────────── # MC-Datenbank-Zugangsdaten (verschlüsselt) # ───────────────────────────────────────────────────────────── def set_group_db_creds(group_id: int, host: str, port: int, user: str, password: str, database: str): """Verschlüsselt und speichert die MC-DB-Zugangsdaten einer Gruppe.""" _creds_query( "INSERT INTO group_databases (group_id, enc_host, enc_port, enc_user, enc_password, enc_database) " "VALUES (%s,%s,%s,%s,%s,%s) " "ON DUPLICATE KEY UPDATE enc_host=VALUES(enc_host), enc_port=VALUES(enc_port), " "enc_user=VALUES(enc_user), enc_password=VALUES(enc_password), enc_database=VALUES(enc_database)", (group_id, encrypt_str(host), encrypt_str(str(port)), encrypt_str(user), encrypt_str(password), encrypt_str(database)), write=True ) def get_group_db_creds(group_id: int) -> dict | None: """Gibt die entschlüsselten MC-DB-Zugangsdaten zurück oder None.""" row = _creds_query( "SELECT * FROM group_databases WHERE group_id=%s", (group_id,), fetchone=True ) if not row: return None return { "host": decrypt_str(row["enc_host"]), "port": int(decrypt_str(row["enc_port"])), "user": decrypt_str(row["enc_user"]), "password": decrypt_str(row["enc_password"]), "database": decrypt_str(row["enc_database"]), } def delete_group_db_creds(group_id: int): _creds_query("DELETE FROM group_databases WHERE group_id=%s", (group_id,), write=True) def has_db_configured(group_id: int) -> bool: row = _creds_query( "SELECT id FROM group_databases WHERE group_id=%s", (group_id,), fetchone=True ) return row is not None