Files
MClogger/web/panel_db.py
simon 179a0e1042 modified: web/blueprints/auth.py
modified:   web/blueprints/group_admin.py
	modified:   web/blueprints/site_admin.py
	modified:   web/config.py
	modified:   web/panel_db.py
	modified:   web/templates/admin/audit_log.html
2026-04-15 10:48:37 +02:00

845 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCLogger Panel-Datenbank-Operationen
Verwaltet Nutzer, Gruppen, Mitgliedschaften (PANEL_DB)
und verschlüsselte MC-DB-Zugangsdaten (CREDS_DB).
"""
import json
import secrets
from datetime import datetime, timedelta
import pymysql
import pymysql.cursors
from config import Config
from crypto import generate_salt, hash_password, verify_password, encrypt_str, decrypt_str
# ─────────────────────────────────────────────────────────────
# Datenbankverbindungen
# ─────────────────────────────────────────────────────────────
def get_panel_db():
return pymysql.connect(
host=Config.PANEL_DB_HOST,
port=Config.PANEL_DB_PORT,
user=Config.PANEL_DB_USER,
password=Config.PANEL_DB_PASSWORD,
database=Config.PANEL_DB_NAME,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
def get_creds_db():
return pymysql.connect(
host=Config.CREDS_DB_HOST,
port=Config.CREDS_DB_PORT,
user=Config.CREDS_DB_USER,
password=Config.CREDS_DB_PASSWORD,
database=Config.CREDS_DB_NAME,
charset="utf8mb4",
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
)
def _panel_query(sql, args=None, fetchone=False, write=False):
conn = get_panel_db()
try:
with conn.cursor() as cur:
cur.execute(sql, args or ())
if write:
return cur.lastrowid
return cur.fetchone() if fetchone else cur.fetchall()
finally:
conn.close()
def _creds_query(sql, args=None, fetchone=False, write=False):
conn = get_creds_db()
try:
with conn.cursor() as cur:
cur.execute(sql, args or ())
if write:
return cur.lastrowid
return cur.fetchone() if fetchone else cur.fetchall()
finally:
conn.close()
# ─────────────────────────────────────────────────────────────
# Initialisierung Tabellen anlegen
# ─────────────────────────────────────────────────────────────
PANEL_SCHEMA = [
"""CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
salt VARCHAR(64) NOT NULL,
is_site_admin TINYINT(1) DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS user_groups (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS group_members (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
group_id INT NOT NULL,
role ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer',
permissions JSON,
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uq_user_group (user_id, group_id),
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
FOREIGN KEY (group_id) REFERENCES user_groups(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS group_invites (
id INT AUTO_INCREMENT PRIMARY KEY,
group_id INT NULL,
is_site_admin TINYINT(1) NOT NULL DEFAULT 0,
invited_username VARCHAR(50) NOT NULL,
invited_email VARCHAR(255) NOT NULL,
role ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer',
token VARCHAR(128) UNIQUE NOT NULL,
created_by_user_id INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at DATETIME NOT NULL,
last_sent_at DATETIME NULL,
send_count INT NOT NULL DEFAULT 0,
accepted_at DATETIME NULL,
revoked_at DATETIME NULL,
FOREIGN KEY (group_id) REFERENCES user_groups(id) ON DELETE CASCADE,
FOREIGN KEY (created_by_user_id) REFERENCES users(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS audit_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
actor_user_id INT NULL,
actor_username VARCHAR(50) NULL,
action VARCHAR(100) NOT NULL,
entity_type VARCHAR(50) NULL,
entity_id VARCHAR(100) NULL,
details JSON NULL,
group_id INT NULL,
ip_address VARCHAR(45) NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_audit_actor (actor_user_id),
INDEX idx_audit_group (group_id),
INDEX idx_audit_action (action),
INDEX idx_audit_ts (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS schema_migrations (
version INT PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
note VARCHAR(255) NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
]
# ─────────────────────────────────────────────────────────────
# Versioned migrations (applied once, tracked in schema_migrations)
# Each entry: (version_int, sql_statement, human_readable_note)
# ─────────────────────────────────────────────────────────────
PANEL_MIGRATIONS = [
(1,
"ALTER TABLE group_members MODIFY COLUMN role "
"ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer'",
"Extend group_members.role ENUM"),
(2,
"ALTER TABLE group_invites MODIFY COLUMN role "
"ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer'",
"Extend group_invites.role ENUM"),
(3,
"ALTER TABLE group_invites ADD COLUMN IF NOT EXISTS last_sent_at DATETIME NULL",
"Add group_invites.last_sent_at"),
(4,
"ALTER TABLE group_invites ADD COLUMN IF NOT EXISTS send_count INT NOT NULL DEFAULT 0",
"Add group_invites.send_count"),
(5,
"ALTER TABLE group_invites MODIFY COLUMN group_id INT NULL",
"Allow group_invites.group_id to be NULL"),
(6,
"ALTER TABLE group_invites ADD COLUMN IF NOT EXISTS is_site_admin TINYINT(1) NOT NULL DEFAULT 0",
"Add group_invites.is_site_admin"),
]
CREDS_SCHEMA = [
"""CREATE TABLE IF NOT EXISTS group_databases (
id INT AUTO_INCREMENT PRIMARY KEY,
group_id INT UNIQUE NOT NULL,
enc_host TEXT NOT NULL,
enc_port TEXT NOT NULL,
enc_user TEXT NOT NULL,
enc_password TEXT NOT NULL,
enc_database TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
"""CREATE TABLE IF NOT EXISTS site_mail_settings (
id INT AUTO_INCREMENT PRIMARY KEY,
config_key VARCHAR(50) UNIQUE NOT NULL,
enc_host TEXT NOT NULL,
enc_port TEXT NOT NULL,
enc_username TEXT NOT NULL,
enc_password TEXT NOT NULL,
enc_from_email TEXT NOT NULL,
enc_from_name TEXT,
enc_use_tls TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
]
def init_databases():
"""Creates all required tables and applies pending schema migrations."""
import logging
_log = logging.getLogger(__name__)
panel = get_panel_db()
try:
with panel.cursor() as cur:
# Create tables (idempotent)
for stmt in PANEL_SCHEMA:
cur.execute(stmt)
# Determine already-applied migration versions
cur.execute("SELECT version FROM schema_migrations")
applied = {row["version"] for row in cur.fetchall()}
for version, sql, note in PANEL_MIGRATIONS:
if version in applied:
continue
try:
cur.execute(sql)
cur.execute(
"INSERT IGNORE INTO schema_migrations (version, note) VALUES (%s, %s)",
(version, note),
)
_log.info("Migration %d applied: %s", version, note)
except Exception as exc:
_log.warning("Migration %d skipped (%s): %s", version, note, exc)
finally:
panel.close()
creds = get_creds_db()
try:
with creds.cursor() as cur:
for stmt in CREDS_SCHEMA:
cur.execute(stmt)
finally:
creds.close()
# Auto-Bereinigung: Audit-Log-Einträge älter als Retention-Tage löschen
purge_old_audit_events(Config.AUDIT_LOG_RETENTION_DAYS)
# ─────────────────────────────────────────────────────────────
# Nutzer
# ─────────────────────────────────────────────────────────────
def create_user(username: str, email: str, password: str, is_site_admin: bool = False) -> int:
salt = generate_salt()
pw_hash = hash_password(password, salt)
return _panel_query(
"INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)",
(username, email, pw_hash, salt, int(is_site_admin)), write=True
)
def create_user_for_group(username: str, email: str, password: str, group_id: int, role: str = "viewer") -> int:
"""Create a non-site-admin user and assign them to a group atomically."""
permissions = Config.DEFAULT_PERMISSIONS
salt = generate_salt()
pw_hash = hash_password(password, salt)
conn = get_panel_db()
conn.autocommit(False)
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)",
(username, email, pw_hash, salt, 0),
)
user_id = cur.lastrowid
cur.execute(
"INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s)",
(user_id, group_id, role, json.dumps(permissions)),
)
conn.commit()
return user_id
except Exception:
conn.rollback()
raise
finally:
conn.close()
def create_group_invite(group_id, username: str, email: str, role: str, created_by_user_id: int, is_site_admin: bool = False) -> str:
expires_at = datetime.utcnow() + timedelta(hours=Config.INVITE_EXPIRY_HOURS)
token = secrets.token_urlsafe(32)
_panel_query(
"INSERT INTO group_invites (group_id, invited_username, invited_email, role, token, created_by_user_id, expires_at, last_sent_at, send_count, is_site_admin) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,NULL,0,%s)",
(group_id, username, email, role, token, created_by_user_id, expires_at, int(is_site_admin)),
write=True,
)
return token
def list_active_group_invites(group_id: int):
return _panel_query(
"SELECT gi.*, u.username AS created_by_username "
"FROM group_invites gi "
"JOIN users u ON u.id = gi.created_by_user_id "
"WHERE gi.group_id=%s AND gi.accepted_at IS NULL AND gi.revoked_at IS NULL AND gi.expires_at > UTC_TIMESTAMP() "
"ORDER BY gi.created_at DESC",
(group_id,),
)
def count_active_group_invites(group_id: int) -> int:
row = _panel_query(
"SELECT COUNT(*) AS c FROM group_invites WHERE group_id=%s AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
(group_id,),
fetchone=True,
)
return int(row["c"]) if row else 0
def get_active_invite_by_email(group_id: int, email: str):
return _panel_query(
"SELECT * FROM group_invites WHERE group_id=%s AND invited_email=%s "
"AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
(group_id, email),
fetchone=True,
)
def get_active_invite_by_username(group_id: int, username: str):
return _panel_query(
"SELECT * FROM group_invites WHERE group_id=%s AND invited_username=%s "
"AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
(group_id, username),
fetchone=True,
)
def get_group_invite_by_id(invite_id: int, group_id: int):
return _panel_query(
"SELECT * FROM group_invites WHERE id=%s AND group_id=%s",
(invite_id, group_id),
fetchone=True,
)
def get_invite_by_token(token: str):
return _panel_query(
"SELECT gi.*, g.name AS group_name, u.username AS created_by_username "
"FROM group_invites gi "
"LEFT JOIN user_groups g ON g.id = gi.group_id "
"JOIN users u ON u.id = gi.created_by_user_id "
"WHERE gi.token=%s",
(token,),
fetchone=True,
)
def revoke_group_invite(invite_id: int, group_id: int):
_panel_query(
"UPDATE group_invites SET revoked_at=UTC_TIMESTAMP() WHERE id=%s AND group_id=%s AND accepted_at IS NULL AND revoked_at IS NULL",
(invite_id, group_id),
write=True,
)
def list_all_active_invites():
"""All pending invites across every group (for site admin users page)."""
return _panel_query(
"SELECT gi.*, g.name AS group_name, u.username AS created_by_username "
"FROM group_invites gi "
"LEFT JOIN user_groups g ON g.id = gi.group_id "
"JOIN users u ON u.id = gi.created_by_user_id "
"WHERE gi.accepted_at IS NULL AND gi.revoked_at IS NULL AND gi.expires_at > UTC_TIMESTAMP() "
"ORDER BY gi.created_at DESC"
)
def get_invite_by_id_global(invite_id: int):
return _panel_query(
"SELECT gi.*, g.name AS group_name, u.username AS created_by_username "
"FROM group_invites gi "
"LEFT JOIN user_groups g ON g.id = gi.group_id "
"JOIN users u ON u.id = gi.created_by_user_id "
"WHERE gi.id=%s",
(invite_id,),
fetchone=True,
)
def revoke_invite_global(invite_id: int):
_panel_query(
"UPDATE group_invites SET revoked_at=UTC_TIMESTAMP() WHERE id=%s AND accepted_at IS NULL AND revoked_at IS NULL",
(invite_id,),
write=True,
)
def mark_invite_sent_global(invite_id: int):
_panel_query(
"UPDATE group_invites SET last_sent_at=UTC_TIMESTAMP(), send_count=send_count+1 WHERE id=%s",
(invite_id,),
write=True,
)
def get_active_invite_by_email_global(email: str):
return _panel_query(
"SELECT * FROM group_invites WHERE invited_email=%s "
"AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
(email,),
fetchone=True,
)
def get_active_invite_by_username_global(username: str):
return _panel_query(
"SELECT * FROM group_invites WHERE invited_username=%s "
"AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
(username,),
fetchone=True,
)
def mark_group_invite_sent(invite_id: int, group_id: int):
_panel_query(
"UPDATE group_invites SET last_sent_at=UTC_TIMESTAMP(), send_count=send_count+1 WHERE id=%s AND group_id=%s",
(invite_id, group_id),
write=True,
)
def accept_group_invite(token: str, password: str) -> dict | None:
invite = get_invite_by_token(token)
if not invite:
return None
if invite.get("accepted_at") or invite.get("revoked_at"):
return None
if invite["expires_at"] <= datetime.utcnow():
return None
permissions = Config.DEFAULT_PERMISSIONS
salt = generate_salt()
pw_hash = hash_password(password, salt)
conn = get_panel_db()
conn.autocommit(False)
try:
with conn.cursor() as cur:
cur.execute("SELECT id FROM users WHERE username=%s OR email=%s", (invite["invited_username"], invite["invited_email"]))
if cur.fetchone():
conn.rollback()
return {"error": "username_or_email_taken"}
cur.execute(
"INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)",
(invite["invited_username"], invite["invited_email"], pw_hash, salt, 0),
)
user_id = cur.lastrowid
site_admin_flag = int(bool(invite.get("is_site_admin")))
cur.execute(
"UPDATE users SET is_site_admin=%s WHERE id=%s",
(site_admin_flag, user_id),
)
if invite["group_id"] is not None:
cur.execute(
"INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s)",
(user_id, invite["group_id"], invite["role"], json.dumps(permissions)),
)
cur.execute(
"UPDATE group_invites SET accepted_at=UTC_TIMESTAMP() WHERE id=%s AND accepted_at IS NULL AND revoked_at IS NULL",
(invite["id"],),
)
conn.commit()
return {"user_id": user_id, "group_id": invite["group_id"]}
except Exception:
conn.rollback()
raise
finally:
conn.close()
def get_user_by_username(username: str):
return _panel_query("SELECT * FROM users WHERE username=%s", (username,), fetchone=True)
def get_user_by_email(email: str):
return _panel_query("SELECT * FROM users WHERE email=%s", (email,), fetchone=True)
def get_user_by_id(user_id: int):
return _panel_query("SELECT * FROM users WHERE id=%s", (user_id,), fetchone=True)
def update_user(user_id: int, username: str, email: str, is_site_admin: bool):
_panel_query(
"UPDATE users SET username=%s, email=%s, is_site_admin=%s WHERE id=%s",
(username, email, int(is_site_admin), user_id), write=True
)
def change_password(user_id: int, new_password: str):
salt = generate_salt()
pw_hash = hash_password(new_password, salt)
_panel_query(
"UPDATE users SET password_hash=%s, salt=%s WHERE id=%s",
(pw_hash, salt, user_id), write=True
)
def delete_user(user_id: int):
_panel_query("DELETE FROM users WHERE id=%s", (user_id,), write=True)
def check_login(username: str, password: str):
"""Prüft Anmeldedaten. Gibt den Nutzer zurück oder None."""
user = get_user_by_username(username)
if not user:
return None
if not verify_password(password, user["salt"], user["password_hash"]):
return None
_panel_query("UPDATE users SET last_login=NOW() WHERE id=%s", (user["id"],), write=True)
return user
def list_all_users():
return _panel_query(
"SELECT u.*, COUNT(gm.group_id) AS group_count "
"FROM users u LEFT JOIN group_members gm ON gm.user_id=u.id "
"GROUP BY u.id ORDER BY u.username"
)
# ─────────────────────────────────────────────────────────────
# Gruppen
# ─────────────────────────────────────────────────────────────
def create_group(name: str, description: str = "") -> int:
return _panel_query(
"INSERT INTO user_groups (name, description) VALUES (%s,%s)",
(name, description), write=True
)
def get_group_by_id(group_id: int):
return _panel_query("SELECT * FROM user_groups WHERE id=%s", (group_id,), fetchone=True)
def get_group_by_name(name: str):
return _panel_query("SELECT * FROM user_groups WHERE name=%s", (name,), fetchone=True)
def update_group(group_id: int, name: str, description: str):
_panel_query(
"UPDATE user_groups SET name=%s, description=%s WHERE id=%s",
(name, description, group_id), write=True
)
def delete_group(group_id: int):
_panel_query("DELETE FROM user_groups WHERE id=%s", (group_id,), write=True)
def list_all_groups():
return _panel_query(
"SELECT g.*, COUNT(gm.user_id) AS member_count "
"FROM user_groups g LEFT JOIN group_members gm ON gm.group_id=g.id "
"GROUP BY g.id ORDER BY g.name"
)
# ─────────────────────────────────────────────────────────────
# Gruppenmitgliedschaften
# ─────────────────────────────────────────────────────────────
def get_user_groups(user_id: int):
return _panel_query(
"SELECT g.*, gm.role, gm.permissions "
"FROM user_groups g "
"JOIN group_members gm ON gm.group_id=g.id "
"WHERE gm.user_id=%s ORDER BY g.name",
(user_id,)
)
def get_group_member(user_id: int, group_id: int):
return _panel_query(
"SELECT * FROM group_members WHERE user_id=%s AND group_id=%s",
(user_id, group_id), fetchone=True
)
def get_group_members(group_id: int):
return _panel_query(
"SELECT u.id, u.username, u.email, u.last_login, gm.role, gm.permissions, gm.joined_at "
"FROM group_members gm "
"JOIN users u ON u.id=gm.user_id "
"WHERE gm.group_id=%s ORDER BY gm.role DESC, u.username",
(group_id,)
)
def add_group_member(user_id: int, group_id: int, role: str = "viewer", permissions: dict = None):
if permissions is None:
permissions = Config.DEFAULT_PERMISSIONS
_panel_query(
"INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s) "
"ON DUPLICATE KEY UPDATE role=VALUES(role), permissions=VALUES(permissions)",
(user_id, group_id, role, json.dumps(permissions)), write=True
)
def update_member(user_id: int, group_id: int, role: str, permissions: dict):
_panel_query(
"UPDATE group_members SET role=%s, permissions=%s WHERE user_id=%s AND group_id=%s",
(role, json.dumps(permissions), user_id, group_id), write=True
)
def remove_group_member(user_id: int, group_id: int):
_panel_query(
"DELETE FROM group_members WHERE user_id=%s AND group_id=%s",
(user_id, group_id), write=True
)
def get_permissions(user_id: int, group_id: int) -> dict:
"""Gibt die Berechtigungen des Nutzers in der Gruppe zurück."""
member = get_group_member(user_id, group_id)
if not member:
return {}
raw = member.get("permissions")
if isinstance(raw, str):
return json.loads(raw)
if isinstance(raw, dict):
return raw
return {}
# ─────────────────────────────────────────────────────────────
# MC-Datenbank-Zugangsdaten (verschlüsselt)
# ─────────────────────────────────────────────────────────────
def set_group_db_creds(group_id: int, host: str, port: int, user: str, password: str, database: str):
"""Verschlüsselt und speichert die MC-DB-Zugangsdaten einer Gruppe."""
_creds_query(
"INSERT INTO group_databases (group_id, enc_host, enc_port, enc_user, enc_password, enc_database) "
"VALUES (%s,%s,%s,%s,%s,%s) "
"ON DUPLICATE KEY UPDATE enc_host=VALUES(enc_host), enc_port=VALUES(enc_port), "
"enc_user=VALUES(enc_user), enc_password=VALUES(enc_password), enc_database=VALUES(enc_database)",
(group_id,
encrypt_str(host),
encrypt_str(str(port)),
encrypt_str(user),
encrypt_str(password),
encrypt_str(database)),
write=True
)
def get_group_db_creds(group_id: int) -> dict | None:
"""Gibt die entschlüsselten MC-DB-Zugangsdaten zurück oder None."""
row = _creds_query(
"SELECT * FROM group_databases WHERE group_id=%s",
(group_id,), fetchone=True
)
if not row:
return None
return {
"host": decrypt_str(row["enc_host"]),
"port": int(decrypt_str(row["enc_port"])),
"user": decrypt_str(row["enc_user"]),
"password": decrypt_str(row["enc_password"]),
"database": decrypt_str(row["enc_database"]),
}
def delete_group_db_creds(group_id: int):
_creds_query("DELETE FROM group_databases WHERE group_id=%s", (group_id,), write=True)
def has_db_configured(group_id: int) -> bool:
row = _creds_query(
"SELECT id FROM group_databases WHERE group_id=%s",
(group_id,), fetchone=True
)
return row is not None
def set_site_mail_settings(host: str, port: int, username: str, password: str, from_email: str, from_name: str, use_tls: bool):
_creds_query(
"INSERT INTO site_mail_settings (config_key, enc_host, enc_port, enc_username, enc_password, enc_from_email, enc_from_name, enc_use_tls) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s) "
"ON DUPLICATE KEY UPDATE enc_host=VALUES(enc_host), enc_port=VALUES(enc_port), enc_username=VALUES(enc_username), "
"enc_password=VALUES(enc_password), enc_from_email=VALUES(enc_from_email), enc_from_name=VALUES(enc_from_name), enc_use_tls=VALUES(enc_use_tls)",
(
"primary",
encrypt_str(host),
encrypt_str(str(port)),
encrypt_str(username),
encrypt_str(password),
encrypt_str(from_email),
encrypt_str(from_name or ""),
encrypt_str("1" if use_tls else "0"),
),
write=True,
)
def get_site_mail_settings() -> dict | None:
row = _creds_query("SELECT * FROM site_mail_settings WHERE config_key=%s", ("primary",), fetchone=True)
if not row:
return None
return {
"host": decrypt_str(row["enc_host"]),
"port": int(decrypt_str(row["enc_port"])),
"username": decrypt_str(row["enc_username"]),
"password": decrypt_str(row["enc_password"]),
"from_email": decrypt_str(row["enc_from_email"]),
"from_name": decrypt_str(row["enc_from_name"]) if row.get("enc_from_name") else "",
"use_tls": decrypt_str(row["enc_use_tls"]) == "1",
"updated_at": row["updated_at"],
}
def delete_site_mail_settings():
_creds_query("DELETE FROM site_mail_settings WHERE config_key=%s", ("primary",), write=True)
def has_site_mail_settings() -> bool:
row = _creds_query("SELECT id FROM site_mail_settings WHERE config_key=%s", ("primary",), fetchone=True)
return row is not None
# ─────────────────────────────────────────────────────────────
# Audit-Log
# ─────────────────────────────────────────────────────────────
def log_audit_event(
actor_user_id,
actor_username: str | None,
action: str,
entity_type: str | None = None,
entity_id: str | None = None,
details: dict | None = None,
group_id: int | None = None,
ip_address: str | None = None,
):
"""Records an audit event. Never raises — audit log must not break the main flow."""
try:
_panel_query(
"INSERT INTO audit_log "
"(actor_user_id, actor_username, action, entity_type, entity_id, details, group_id, ip_address) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
(
actor_user_id,
actor_username,
action,
entity_type,
str(entity_id) if entity_id is not None else None,
json.dumps(details) if details else None,
group_id,
ip_address,
),
write=True,
)
except Exception:
import logging
logging.getLogger(__name__).warning("Failed to write audit event: %s", action)
def get_audit_log(
page: int = 1,
per_page: int = 50,
action_filter: str | None = None,
group_id_filter: int | None = None,
actor_filter: str | None = None,
):
offset = (page - 1) * per_page
conditions: list[str] = []
args: list = []
if action_filter:
conditions.append("al.action LIKE %s")
args.append(f"%{action_filter}%")
if group_id_filter:
conditions.append("al.group_id = %s")
args.append(group_id_filter)
if actor_filter:
conditions.append("al.actor_username LIKE %s")
args.append(f"%{actor_filter}%")
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
count_row = _panel_query(
f"SELECT COUNT(*) AS c FROM audit_log al {where}", args, fetchone=True
)
total = int(count_row["c"]) if count_row else 0
rows = _panel_query(
f"SELECT al.*, g.name AS group_name "
f"FROM audit_log al "
f"LEFT JOIN user_groups g ON g.id = al.group_id "
f"{where} ORDER BY al.created_at DESC LIMIT %s OFFSET %s",
args + [per_page, offset],
)
# Ensure details is always a dict (pymysql may return JSON as string)
for row in (rows or []):
d = row.get("details")
if isinstance(d, str):
try:
row["details"] = json.loads(d)
except Exception:
row["details"] = {}
return rows, total
def get_audit_log_distinct_actions() -> list[str]:
rows = _panel_query("SELECT DISTINCT action FROM audit_log ORDER BY action")
return [r["action"] for r in rows] if rows else []
def purge_old_audit_events(retention_days: int) -> int:
"""Deletes audit log entries older than *retention_days* days.
Returns the number of deleted rows. Skips if retention_days <= 0."""
import logging
_log = logging.getLogger(__name__)
if retention_days <= 0:
return 0
try:
conn = get_panel_db()
try:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM audit_log WHERE created_at < UTC_TIMESTAMP() - INTERVAL %s DAY",
(retention_days,),
)
deleted = cur.rowcount
conn.commit()
finally:
conn.close()
if deleted:
_log.info("Purged %d audit log entries older than %d days", deleted, retention_days)
return deleted
except Exception as exc:
_log.warning("Failed to purge audit log: %s", exc)
return 0