from flask import Flask, render_template, redirect, url_for, request, session, jsonify, send_file import os import subprocess import psutil import mysql.connector from datetime import datetime app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET_KEY", "default_secret_key") LOG_FILE_PATH = os.path.join("logs", f"{datetime.now().strftime('%Y-%m-%d')}.log") # Verwende Umgebungsvariablen aus Coolify für die Datenbankverbindung DB_HOST = os.getenv("DB_HOST") DB_PORT = os.getenv("DB_PORT") DB_USER = os.getenv("DB_USER") DB_PASS = os.getenv("DB_PASSWORD") DB_NAME = os.getenv("DB_DATABASE") # Globale Variablen für die Intros INTRO_FILE = "introduction.txt" ASKNOTES_INTRO_FILE = "asknotesintro.txt" # Speichern der Prozess-ID bot_process = None def bot_status(): """Überprüft, ob der Bot läuft.""" global bot_process if bot_process is None: return False return bot_process.poll() is None # None bedeutet, dass der Prozess noch läuft def start_bot(): """Startet den Bot.""" global bot_process if not bot_status(): bot_process = subprocess.Popen(["python", "bot.py"], cwd=os.path.dirname(os.path.abspath(__file__))) else: print("Bot läuft bereits.") def stop_bot(): """Stoppt den Bot.""" global bot_process if bot_process and bot_status(): bot_process.terminate() bot_process.wait() # Warten, bis der Prozess beendet ist bot_process = None else: print("Bot läuft nicht.") def load_text_file(file_path): """Lädt den Inhalt einer Textdatei.""" if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() return "" def save_text_file(file_path, content): """Speichert den Inhalt in einer Textdatei.""" with open(file_path, 'w', encoding='utf-8') as file: file.write(content) def get_db_connection(): """Stellt eine Verbindung zur MySQL-Datenbank her.""" return mysql.connector.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, database=DB_NAME ) @app.route("/") def index(): if "username" in session: return render_template("index.html", bot_running=bot_status()) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form["username"] password = request.form["password"] if username == os.getenv("ADMIN_USER") and password == os.getenv("ADMIN_PASS"): session["username"] = username return redirect(url_for("index")) else: return "Invalid credentials!" return render_template("login.html") @app.route("/logout") def logout(): session.pop("username", None) return redirect(url_for("login")) @app.route("/start_bot") def start(): if "username" in session: start_bot() return redirect(url_for("index")) return redirect(url_for("login")) @app.route("/stop_bot") def stop(): if "username" in session: stop_bot() return redirect(url_for("index")) return redirect(url_for("login")) @app.route("/settings", methods=["GET", "POST"]) def settings(): if "username" in session: if request.method == "POST": introduction = request.form.get("introduction") asknotes_introduction = request.form.get("asknotes_introduction") # Speichern der Intros save_text_file(INTRO_FILE, introduction) save_text_file(ASKNOTES_INTRO_FILE, asknotes_introduction) return redirect(url_for("settings")) # Laden der aktuellen Inhalte aus den Textdateien introduction = load_text_file(INTRO_FILE) asknotes_introduction = load_text_file(ASKNOTES_INTRO_FILE) return render_template("settings.html", introduction=introduction, asknotes_introduction=asknotes_introduction) return redirect(url_for("login")) @app.route("/users") def users(): """Zeigt eine Liste aller Benutzer an.""" if "username" in session: connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_id, username, permission, points, ban FROM user_data") users = cursor.fetchall() cursor.close() connection.close() return render_template("users.html", users=users) return redirect(url_for("login")) @app.route("/ban_user/") def ban_user(user_id): """Banned einen Benutzer.""" if "username" in session: connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET ban = 1 WHERE user_id = %s", (user_id,)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error banning user: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/update_points/", methods=["POST"]) def update_points(user_id): """Aktualisiert die Punkte eines Benutzers.""" if "username" in session: points_change = int(request.form["points_change"]) connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET points = points + %s WHERE user_id = %s", (points_change, user_id)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error updating points: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/unban_user/") def unban_user(user_id): """Entbannt einen Benutzer.""" if "username" in session: connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET ban = 0 WHERE user_id = %s", (user_id,)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error unbanning user: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/update_role/", methods=["POST"]) def update_role(user_id): """Aktualisiert die Rolle (Berechtigung) eines Benutzers.""" if "username" in session: new_permission = request.form["permission"] connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET permission = %s WHERE user_id = %s", (new_permission, user_id)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error updating role: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/logs") def view_logs(): """Zeigt die Logs des Bots im Admin-Panel an.""" if "username" in session: return render_template("logs.html") return redirect(url_for("login")) @app.route("/get_logs") def get_logs(): """Liest den Inhalt der Log-Datei und gibt ihn zurück.""" if "username" in session: try: with open(LOG_FILE_PATH, 'r', encoding='utf-8') as file: logs = file.read() return jsonify({"logs": logs}) except FileNotFoundError: return jsonify({"logs": "Log file not found."}) return redirect(url_for("login")) def get_bot_statistics(): """Berechnet grundlegende Statistiken für den Bot.""" connection = get_db_connection() cursor = connection.cursor(dictionary=True) # Beispielabfragen, anpassen je nach Datenbankstruktur cursor.execute("SELECT COUNT(*) AS total_messages FROM chat_history") total_messages = cursor.fetchone()["total_messages"] cursor.execute(""" SELECT command_name, COUNT(*) AS usage_count FROM command_log GROUP BY command_name ORDER BY usage_count DESC LIMIT 1 """) most_used_command = cursor.fetchone()["command_name"] cursor.close() connection.close() return { "total_messages": total_messages, "most_used_command": most_used_command, } @app.route("/download_logs") def download_logs(): """Bietet die Log-Datei zum Download an.""" if "username" in session: return send_file(LOG_FILE_PATH, as_attachment=True) return redirect(url_for("login")) import shutil ARCHIVE_DIR = "archive_logs" if not os.path.exists(ARCHIVE_DIR): os.makedirs(ARCHIVE_DIR) @app.route("/archive_logs", methods=["POST"]) def archive_logs(): """Archiviert die aktuelle Log-Datei und beginnt eine neue Log-Datei.""" if "username" in session: if os.path.exists(LOG_FILE_PATH): timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') archive_file = os.path.join(ARCHIVE_DIR, f"log_{timestamp}.log") shutil.move(LOG_FILE_PATH, archive_file) with open(LOG_FILE_PATH, 'w', encoding='utf-8') as file: file.write("") # Neue leere Log-Datei starten return jsonify({"status": "success", "message": "Logs archived successfully."}) else: return jsonify({"status": "error", "message": "Log file not found."}) return redirect(url_for("login")) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)