612 lines
23 KiB
Python
612 lines
23 KiB
Python
"""
|
||
MCLogger – Panel-Datenbank-Operationen
|
||
Verwaltet Nutzer, Gruppen, Mitgliedschaften (PANEL_DB)
|
||
und verschlüsselte MC-DB-Zugangsdaten (CREDS_DB).
|
||
"""
|
||
import json
|
||
import secrets
|
||
from datetime import datetime, timedelta
|
||
import pymysql
|
||
import pymysql.cursors
|
||
from config import Config
|
||
from crypto import generate_salt, hash_password, verify_password, encrypt_str, decrypt_str
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# Datenbankverbindungen
|
||
# ─────────────────────────────────────────────────────────────
|
||
|
||
def get_panel_db():
|
||
return pymysql.connect(
|
||
host=Config.PANEL_DB_HOST,
|
||
port=Config.PANEL_DB_PORT,
|
||
user=Config.PANEL_DB_USER,
|
||
password=Config.PANEL_DB_PASSWORD,
|
||
database=Config.PANEL_DB_NAME,
|
||
charset="utf8mb4",
|
||
cursorclass=pymysql.cursors.DictCursor,
|
||
autocommit=True,
|
||
)
|
||
|
||
|
||
def get_creds_db():
|
||
return pymysql.connect(
|
||
host=Config.CREDS_DB_HOST,
|
||
port=Config.CREDS_DB_PORT,
|
||
user=Config.CREDS_DB_USER,
|
||
password=Config.CREDS_DB_PASSWORD,
|
||
database=Config.CREDS_DB_NAME,
|
||
charset="utf8mb4",
|
||
cursorclass=pymysql.cursors.DictCursor,
|
||
autocommit=True,
|
||
)
|
||
|
||
|
||
def _panel_query(sql, args=None, fetchone=False, write=False):
|
||
conn = get_panel_db()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, args or ())
|
||
if write:
|
||
return cur.lastrowid
|
||
return cur.fetchone() if fetchone else cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def _creds_query(sql, args=None, fetchone=False, write=False):
|
||
conn = get_creds_db()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql, args or ())
|
||
if write:
|
||
return cur.lastrowid
|
||
return cur.fetchone() if fetchone else cur.fetchall()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# Initialisierung – Tabellen anlegen
|
||
# ─────────────────────────────────────────────────────────────
|
||
|
||
PANEL_SCHEMA = [
|
||
"""CREATE TABLE IF NOT EXISTS users (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
username VARCHAR(50) UNIQUE NOT NULL,
|
||
email VARCHAR(255) UNIQUE NOT NULL,
|
||
password_hash VARCHAR(255) NOT NULL,
|
||
salt VARCHAR(64) NOT NULL,
|
||
is_site_admin TINYINT(1) DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
last_login TIMESTAMP NULL
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
|
||
|
||
"""CREATE TABLE IF NOT EXISTS user_groups (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
name VARCHAR(100) UNIQUE NOT NULL,
|
||
description TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
|
||
|
||
"""CREATE TABLE IF NOT EXISTS group_members (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
user_id INT NOT NULL,
|
||
group_id INT NOT NULL,
|
||
role ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer',
|
||
permissions JSON,
|
||
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
UNIQUE KEY uq_user_group (user_id, group_id),
|
||
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
|
||
FOREIGN KEY (group_id) REFERENCES user_groups(id) ON DELETE CASCADE
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
|
||
|
||
"""CREATE TABLE IF NOT EXISTS group_invites (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
group_id INT NOT NULL,
|
||
invited_username VARCHAR(50) NOT NULL,
|
||
invited_email VARCHAR(255) NOT NULL,
|
||
role ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer',
|
||
token VARCHAR(128) UNIQUE NOT NULL,
|
||
created_by_user_id INT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
expires_at DATETIME NOT NULL,
|
||
last_sent_at DATETIME NULL,
|
||
send_count INT NOT NULL DEFAULT 0,
|
||
accepted_at DATETIME NULL,
|
||
revoked_at DATETIME NULL,
|
||
UNIQUE KEY uq_group_pending_invite_email (group_id, invited_email, revoked_at, accepted_at),
|
||
FOREIGN KEY (group_id) REFERENCES user_groups(id) ON DELETE CASCADE,
|
||
FOREIGN KEY (created_by_user_id) REFERENCES users(id) ON DELETE CASCADE
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
|
||
]
|
||
|
||
CREDS_SCHEMA = [
|
||
"""CREATE TABLE IF NOT EXISTS group_databases (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
group_id INT UNIQUE NOT NULL,
|
||
enc_host TEXT NOT NULL,
|
||
enc_port TEXT NOT NULL,
|
||
enc_user TEXT NOT NULL,
|
||
enc_password TEXT NOT NULL,
|
||
enc_database TEXT NOT NULL,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
|
||
|
||
"""CREATE TABLE IF NOT EXISTS site_mail_settings (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
config_key VARCHAR(50) UNIQUE NOT NULL,
|
||
enc_host TEXT NOT NULL,
|
||
enc_port TEXT NOT NULL,
|
||
enc_username TEXT NOT NULL,
|
||
enc_password TEXT NOT NULL,
|
||
enc_from_email TEXT NOT NULL,
|
||
enc_from_name TEXT,
|
||
enc_use_tls TEXT NOT NULL,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
|
||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4""",
|
||
]
|
||
|
||
|
||
def init_databases():
|
||
"""Erstellt alle benötigten Tabellen falls nicht vorhanden."""
|
||
panel = get_panel_db()
|
||
try:
|
||
with panel.cursor() as cur:
|
||
for stmt in PANEL_SCHEMA:
|
||
cur.execute(stmt)
|
||
# Best-effort migrations for existing installs.
|
||
try:
|
||
cur.execute(
|
||
"ALTER TABLE group_members MODIFY role ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer'"
|
||
)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
cur.execute(
|
||
"ALTER TABLE group_invites MODIFY role ENUM('group_owner','group_admin','moderator','viewer','auditor','admin','member') DEFAULT 'viewer'"
|
||
)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
cur.execute("ALTER TABLE group_invites ADD COLUMN last_sent_at DATETIME NULL")
|
||
except Exception:
|
||
pass
|
||
try:
|
||
cur.execute("ALTER TABLE group_invites ADD COLUMN send_count INT NOT NULL DEFAULT 0")
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
panel.close()
|
||
|
||
creds = get_creds_db()
|
||
try:
|
||
with creds.cursor() as cur:
|
||
for stmt in CREDS_SCHEMA:
|
||
cur.execute(stmt)
|
||
finally:
|
||
creds.close()
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# Nutzer
|
||
# ─────────────────────────────────────────────────────────────
|
||
|
||
def create_user(username: str, email: str, password: str, is_site_admin: bool = False) -> int:
|
||
salt = generate_salt()
|
||
pw_hash = hash_password(password, salt)
|
||
return _panel_query(
|
||
"INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)",
|
||
(username, email, pw_hash, salt, int(is_site_admin)), write=True
|
||
)
|
||
|
||
|
||
def create_user_for_group(username: str, email: str, password: str, group_id: int, role: str = "viewer") -> int:
|
||
"""Create a non-site-admin user and assign them to a group atomically."""
|
||
permissions = Config.DEFAULT_PERMISSIONS
|
||
salt = generate_salt()
|
||
pw_hash = hash_password(password, salt)
|
||
|
||
conn = get_panel_db()
|
||
conn.autocommit(False)
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute(
|
||
"INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)",
|
||
(username, email, pw_hash, salt, 0),
|
||
)
|
||
user_id = cur.lastrowid
|
||
cur.execute(
|
||
"INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s)",
|
||
(user_id, group_id, role, json.dumps(permissions)),
|
||
)
|
||
conn.commit()
|
||
return user_id
|
||
except Exception:
|
||
conn.rollback()
|
||
raise
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def create_group_invite(group_id: int, username: str, email: str, role: str, created_by_user_id: int) -> str:
|
||
expires_at = datetime.utcnow() + timedelta(hours=Config.INVITE_EXPIRY_HOURS)
|
||
token = secrets.token_urlsafe(32)
|
||
_panel_query(
|
||
"INSERT INTO group_invites (group_id, invited_username, invited_email, role, token, created_by_user_id, expires_at, last_sent_at, send_count) "
|
||
"VALUES (%s,%s,%s,%s,%s,%s,%s,NULL,0)",
|
||
(group_id, username, email, role, token, created_by_user_id, expires_at),
|
||
write=True,
|
||
)
|
||
return token
|
||
|
||
|
||
def list_active_group_invites(group_id: int):
|
||
return _panel_query(
|
||
"SELECT gi.*, u.username AS created_by_username "
|
||
"FROM group_invites gi "
|
||
"JOIN users u ON u.id = gi.created_by_user_id "
|
||
"WHERE gi.group_id=%s AND gi.accepted_at IS NULL AND gi.revoked_at IS NULL AND gi.expires_at > UTC_TIMESTAMP() "
|
||
"ORDER BY gi.created_at DESC",
|
||
(group_id,),
|
||
)
|
||
|
||
|
||
def count_active_group_invites(group_id: int) -> int:
|
||
row = _panel_query(
|
||
"SELECT COUNT(*) AS c FROM group_invites WHERE group_id=%s AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
|
||
(group_id,),
|
||
fetchone=True,
|
||
)
|
||
return int(row["c"]) if row else 0
|
||
|
||
|
||
def get_active_invite_by_email(group_id: int, email: str):
|
||
return _panel_query(
|
||
"SELECT * FROM group_invites WHERE group_id=%s AND invited_email=%s "
|
||
"AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
|
||
(group_id, email),
|
||
fetchone=True,
|
||
)
|
||
|
||
|
||
def get_active_invite_by_username(group_id: int, username: str):
|
||
return _panel_query(
|
||
"SELECT * FROM group_invites WHERE group_id=%s AND invited_username=%s "
|
||
"AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > UTC_TIMESTAMP()",
|
||
(group_id, username),
|
||
fetchone=True,
|
||
)
|
||
|
||
|
||
def get_group_invite_by_id(invite_id: int, group_id: int):
|
||
return _panel_query(
|
||
"SELECT * FROM group_invites WHERE id=%s AND group_id=%s",
|
||
(invite_id, group_id),
|
||
fetchone=True,
|
||
)
|
||
|
||
|
||
def get_invite_by_token(token: str):
|
||
return _panel_query(
|
||
"SELECT gi.*, g.name AS group_name, u.username AS created_by_username "
|
||
"FROM group_invites gi "
|
||
"JOIN user_groups g ON g.id = gi.group_id "
|
||
"JOIN users u ON u.id = gi.created_by_user_id "
|
||
"WHERE gi.token=%s",
|
||
(token,),
|
||
fetchone=True,
|
||
)
|
||
|
||
|
||
def revoke_group_invite(invite_id: int, group_id: int):
|
||
_panel_query(
|
||
"UPDATE group_invites SET revoked_at=UTC_TIMESTAMP() WHERE id=%s AND group_id=%s AND accepted_at IS NULL AND revoked_at IS NULL",
|
||
(invite_id, group_id),
|
||
write=True,
|
||
)
|
||
|
||
|
||
def mark_group_invite_sent(invite_id: int, group_id: int):
|
||
_panel_query(
|
||
"UPDATE group_invites SET last_sent_at=UTC_TIMESTAMP(), send_count=send_count+1 WHERE id=%s AND group_id=%s",
|
||
(invite_id, group_id),
|
||
write=True,
|
||
)
|
||
|
||
|
||
def accept_group_invite(token: str, password: str) -> dict | None:
|
||
invite = get_invite_by_token(token)
|
||
if not invite:
|
||
return None
|
||
if invite.get("accepted_at") or invite.get("revoked_at"):
|
||
return None
|
||
if invite["expires_at"] <= datetime.utcnow():
|
||
return None
|
||
|
||
permissions = Config.DEFAULT_PERMISSIONS
|
||
salt = generate_salt()
|
||
pw_hash = hash_password(password, salt)
|
||
|
||
conn = get_panel_db()
|
||
conn.autocommit(False)
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT id FROM users WHERE username=%s OR email=%s", (invite["invited_username"], invite["invited_email"]))
|
||
if cur.fetchone():
|
||
conn.rollback()
|
||
return {"error": "username_or_email_taken"}
|
||
|
||
cur.execute(
|
||
"INSERT INTO users (username, email, password_hash, salt, is_site_admin) VALUES (%s,%s,%s,%s,%s)",
|
||
(invite["invited_username"], invite["invited_email"], pw_hash, salt, 0),
|
||
)
|
||
user_id = cur.lastrowid
|
||
cur.execute(
|
||
"INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s)",
|
||
(user_id, invite["group_id"], invite["role"], json.dumps(permissions)),
|
||
)
|
||
cur.execute(
|
||
"UPDATE group_invites SET accepted_at=UTC_TIMESTAMP() WHERE id=%s AND accepted_at IS NULL AND revoked_at IS NULL",
|
||
(invite["id"],),
|
||
)
|
||
conn.commit()
|
||
return {"user_id": user_id, "group_id": invite["group_id"]}
|
||
except Exception:
|
||
conn.rollback()
|
||
raise
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def get_user_by_username(username: str):
|
||
return _panel_query("SELECT * FROM users WHERE username=%s", (username,), fetchone=True)
|
||
|
||
|
||
def get_user_by_email(email: str):
|
||
return _panel_query("SELECT * FROM users WHERE email=%s", (email,), fetchone=True)
|
||
|
||
|
||
def get_user_by_id(user_id: int):
|
||
return _panel_query("SELECT * FROM users WHERE id=%s", (user_id,), fetchone=True)
|
||
|
||
|
||
def update_user(user_id: int, username: str, email: str, is_site_admin: bool):
|
||
_panel_query(
|
||
"UPDATE users SET username=%s, email=%s, is_site_admin=%s WHERE id=%s",
|
||
(username, email, int(is_site_admin), user_id), write=True
|
||
)
|
||
|
||
|
||
def change_password(user_id: int, new_password: str):
|
||
salt = generate_salt()
|
||
pw_hash = hash_password(new_password, salt)
|
||
_panel_query(
|
||
"UPDATE users SET password_hash=%s, salt=%s WHERE id=%s",
|
||
(pw_hash, salt, user_id), write=True
|
||
)
|
||
|
||
|
||
def delete_user(user_id: int):
|
||
_panel_query("DELETE FROM users WHERE id=%s", (user_id,), write=True)
|
||
|
||
|
||
def check_login(username: str, password: str):
|
||
"""Prüft Anmeldedaten. Gibt den Nutzer zurück oder None."""
|
||
user = get_user_by_username(username)
|
||
if not user:
|
||
return None
|
||
if not verify_password(password, user["salt"], user["password_hash"]):
|
||
return None
|
||
_panel_query("UPDATE users SET last_login=NOW() WHERE id=%s", (user["id"],), write=True)
|
||
return user
|
||
|
||
|
||
def list_all_users():
|
||
return _panel_query(
|
||
"SELECT u.*, COUNT(gm.group_id) AS group_count "
|
||
"FROM users u LEFT JOIN group_members gm ON gm.user_id=u.id "
|
||
"GROUP BY u.id ORDER BY u.username"
|
||
)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# Gruppen
|
||
# ─────────────────────────────────────────────────────────────
|
||
|
||
def create_group(name: str, description: str = "") -> int:
|
||
return _panel_query(
|
||
"INSERT INTO user_groups (name, description) VALUES (%s,%s)",
|
||
(name, description), write=True
|
||
)
|
||
|
||
|
||
def get_group_by_id(group_id: int):
|
||
return _panel_query("SELECT * FROM user_groups WHERE id=%s", (group_id,), fetchone=True)
|
||
|
||
|
||
def get_group_by_name(name: str):
|
||
return _panel_query("SELECT * FROM user_groups WHERE name=%s", (name,), fetchone=True)
|
||
|
||
|
||
def update_group(group_id: int, name: str, description: str):
|
||
_panel_query(
|
||
"UPDATE user_groups SET name=%s, description=%s WHERE id=%s",
|
||
(name, description, group_id), write=True
|
||
)
|
||
|
||
|
||
def delete_group(group_id: int):
|
||
_panel_query("DELETE FROM user_groups WHERE id=%s", (group_id,), write=True)
|
||
|
||
|
||
def list_all_groups():
|
||
return _panel_query(
|
||
"SELECT g.*, COUNT(gm.user_id) AS member_count "
|
||
"FROM user_groups g LEFT JOIN group_members gm ON gm.group_id=g.id "
|
||
"GROUP BY g.id ORDER BY g.name"
|
||
)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# Gruppenmitgliedschaften
|
||
# ─────────────────────────────────────────────────────────────
|
||
|
||
def get_user_groups(user_id: int):
|
||
return _panel_query(
|
||
"SELECT g.*, gm.role, gm.permissions "
|
||
"FROM user_groups g "
|
||
"JOIN group_members gm ON gm.group_id=g.id "
|
||
"WHERE gm.user_id=%s ORDER BY g.name",
|
||
(user_id,)
|
||
)
|
||
|
||
|
||
def get_group_member(user_id: int, group_id: int):
|
||
return _panel_query(
|
||
"SELECT * FROM group_members WHERE user_id=%s AND group_id=%s",
|
||
(user_id, group_id), fetchone=True
|
||
)
|
||
|
||
|
||
def get_group_members(group_id: int):
|
||
return _panel_query(
|
||
"SELECT u.id, u.username, u.email, u.last_login, gm.role, gm.permissions, gm.joined_at "
|
||
"FROM group_members gm "
|
||
"JOIN users u ON u.id=gm.user_id "
|
||
"WHERE gm.group_id=%s ORDER BY gm.role DESC, u.username",
|
||
(group_id,)
|
||
)
|
||
|
||
|
||
def add_group_member(user_id: int, group_id: int, role: str = "viewer", permissions: dict = None):
|
||
if permissions is None:
|
||
permissions = Config.DEFAULT_PERMISSIONS
|
||
_panel_query(
|
||
"INSERT INTO group_members (user_id, group_id, role, permissions) VALUES (%s,%s,%s,%s) "
|
||
"ON DUPLICATE KEY UPDATE role=VALUES(role), permissions=VALUES(permissions)",
|
||
(user_id, group_id, role, json.dumps(permissions)), write=True
|
||
)
|
||
|
||
|
||
def update_member(user_id: int, group_id: int, role: str, permissions: dict):
|
||
_panel_query(
|
||
"UPDATE group_members SET role=%s, permissions=%s WHERE user_id=%s AND group_id=%s",
|
||
(role, json.dumps(permissions), user_id, group_id), write=True
|
||
)
|
||
|
||
|
||
def remove_group_member(user_id: int, group_id: int):
|
||
_panel_query(
|
||
"DELETE FROM group_members WHERE user_id=%s AND group_id=%s",
|
||
(user_id, group_id), write=True
|
||
)
|
||
|
||
|
||
def get_permissions(user_id: int, group_id: int) -> dict:
|
||
"""Gibt die Berechtigungen des Nutzers in der Gruppe zurück."""
|
||
member = get_group_member(user_id, group_id)
|
||
if not member:
|
||
return {}
|
||
raw = member.get("permissions")
|
||
if isinstance(raw, str):
|
||
return json.loads(raw)
|
||
if isinstance(raw, dict):
|
||
return raw
|
||
return {}
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────
|
||
# MC-Datenbank-Zugangsdaten (verschlüsselt)
|
||
# ─────────────────────────────────────────────────────────────
|
||
|
||
def set_group_db_creds(group_id: int, host: str, port: int, user: str, password: str, database: str):
|
||
"""Verschlüsselt und speichert die MC-DB-Zugangsdaten einer Gruppe."""
|
||
_creds_query(
|
||
"INSERT INTO group_databases (group_id, enc_host, enc_port, enc_user, enc_password, enc_database) "
|
||
"VALUES (%s,%s,%s,%s,%s,%s) "
|
||
"ON DUPLICATE KEY UPDATE enc_host=VALUES(enc_host), enc_port=VALUES(enc_port), "
|
||
"enc_user=VALUES(enc_user), enc_password=VALUES(enc_password), enc_database=VALUES(enc_database)",
|
||
(group_id,
|
||
encrypt_str(host),
|
||
encrypt_str(str(port)),
|
||
encrypt_str(user),
|
||
encrypt_str(password),
|
||
encrypt_str(database)),
|
||
write=True
|
||
)
|
||
|
||
|
||
def get_group_db_creds(group_id: int) -> dict | None:
|
||
"""Gibt die entschlüsselten MC-DB-Zugangsdaten zurück oder None."""
|
||
row = _creds_query(
|
||
"SELECT * FROM group_databases WHERE group_id=%s",
|
||
(group_id,), fetchone=True
|
||
)
|
||
if not row:
|
||
return None
|
||
return {
|
||
"host": decrypt_str(row["enc_host"]),
|
||
"port": int(decrypt_str(row["enc_port"])),
|
||
"user": decrypt_str(row["enc_user"]),
|
||
"password": decrypt_str(row["enc_password"]),
|
||
"database": decrypt_str(row["enc_database"]),
|
||
}
|
||
|
||
|
||
def delete_group_db_creds(group_id: int):
|
||
_creds_query("DELETE FROM group_databases WHERE group_id=%s", (group_id,), write=True)
|
||
|
||
|
||
def has_db_configured(group_id: int) -> bool:
|
||
row = _creds_query(
|
||
"SELECT id FROM group_databases WHERE group_id=%s",
|
||
(group_id,), fetchone=True
|
||
)
|
||
return row is not None
|
||
|
||
|
||
def set_site_mail_settings(host: str, port: int, username: str, password: str, from_email: str, from_name: str, use_tls: bool):
|
||
_creds_query(
|
||
"INSERT INTO site_mail_settings (config_key, enc_host, enc_port, enc_username, enc_password, enc_from_email, enc_from_name, enc_use_tls) "
|
||
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s) "
|
||
"ON DUPLICATE KEY UPDATE enc_host=VALUES(enc_host), enc_port=VALUES(enc_port), enc_username=VALUES(enc_username), "
|
||
"enc_password=VALUES(enc_password), enc_from_email=VALUES(enc_from_email), enc_from_name=VALUES(enc_from_name), enc_use_tls=VALUES(enc_use_tls)",
|
||
(
|
||
"primary",
|
||
encrypt_str(host),
|
||
encrypt_str(str(port)),
|
||
encrypt_str(username),
|
||
encrypt_str(password),
|
||
encrypt_str(from_email),
|
||
encrypt_str(from_name or ""),
|
||
encrypt_str("1" if use_tls else "0"),
|
||
),
|
||
write=True,
|
||
)
|
||
|
||
|
||
def get_site_mail_settings() -> dict | None:
|
||
row = _creds_query("SELECT * FROM site_mail_settings WHERE config_key=%s", ("primary",), fetchone=True)
|
||
if not row:
|
||
return None
|
||
return {
|
||
"host": decrypt_str(row["enc_host"]),
|
||
"port": int(decrypt_str(row["enc_port"])),
|
||
"username": decrypt_str(row["enc_username"]),
|
||
"password": decrypt_str(row["enc_password"]),
|
||
"from_email": decrypt_str(row["enc_from_email"]),
|
||
"from_name": decrypt_str(row["enc_from_name"]) if row.get("enc_from_name") else "",
|
||
"use_tls": decrypt_str(row["enc_use_tls"]) == "1",
|
||
"updated_at": row["updated_at"],
|
||
}
|
||
|
||
|
||
def delete_site_mail_settings():
|
||
_creds_query("DELETE FROM site_mail_settings WHERE config_key=%s", ("primary",), write=True)
|
||
|
||
|
||
def has_site_mail_settings() -> bool:
|
||
row = _creds_query("SELECT id FROM site_mail_settings WHERE config_key=%s", ("primary",), fetchone=True)
|
||
return row is not None
|