__version__ = "dev-0.4.6" __all__ = ["Discordbot-chatai-webpanel (Discord)"] __author__ = "SimolZimol" from flask import Flask, render_template, redirect, url_for, request, session, jsonify, send_file import os import subprocess import psutil import mysql.connector import requests from datetime import datetime app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET_KEY") LOG_FILE_PATH = os.path.join("logs", f"{datetime.now().strftime('%Y-%m-%d')}.log") # Verwende Umgebungsvariablen aus Coolify für die Datenbankverbindung DB_HOST = os.getenv("DB_HOST") DB_PORT = os.getenv("DB_PORT") DB_USER = os.getenv("DB_USER") DB_PASS = os.getenv("DB_PASSWORD") DB_NAME = os.getenv("DB_DATABASE") DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID") DISCORD_CLIENT_SECRET = os.getenv("DISCORD_CLIENT_SECRET") DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI") DISCORD_API_BASE_URL = "https://discord.com/api" def get_db_connection(): connection = mysql.connector.connect( host=DB_HOST, user=DB_USER, password=DB_PASS, database=DB_NAME ) return connection @app.route("/login") def login(): discord_authorize_url = f"https://discord.com/api/oauth2/authorize?client_id={DISCORD_CLIENT_ID}&redirect_uri={DISCORD_REDIRECT_URI}&response_type=code&scope=identify" return redirect(discord_authorize_url) # Route for OAuth2 Callback @app.route("/callback") def callback(): code = request.args.get("code") if code is None: return redirect(url_for("login")) # Step 1: Exchange the authorization code for an access token data = { "client_id": DISCORD_CLIENT_ID, "client_secret": DISCORD_CLIENT_SECRET, "grant_type": "authorization_code", "code": code, "redirect_uri": DISCORD_REDIRECT_URI } headers = {"Content-Type": "application/x-www-form-urlencoded"} response = requests.post(f"{DISCORD_API_BASE_URL}/oauth2/token", data=data, headers=headers) response_data = response.json() if response.status_code != 200: return jsonify(response_data), 400 access_token = response_data.get("access_token") # Step 2: Use the access token to fetch the user's info from Discord headers = { "Authorization": f"Bearer {access_token}" } user_response = requests.get(f"{DISCORD_API_BASE_URL}/users/@me", headers=headers) user_data = user_response.json() if user_response.status_code != 200: return jsonify(user_data), 400 discord_user_id = user_data["id"] discord_username = user_data["username"] # Step 3: Check user's permissions from the database connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT permission FROM user_data WHERE user_id = %s", (discord_user_id,)) result = cursor.fetchone() if result is None: return "Access Denied: You are not registered in the database.", 403 user_permission = result["permission"] if user_permission < 8: # Check if user is Admin or higher return "Access Denied: You do not have sufficient permissions.", 403 # Step 4: Log the user in session["user_id"] = discord_user_id session["username"] = discord_username session["permission"] = user_permission return redirect(url_for("index")) # Route for Logout @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) # Route for Admin Dashboard (Protected) @app.route("/admin") def index(): if "user_id" in session and session["permission"] >= 8: return f"Hello, {session['username']}! Welcome to the Admin Panel." return redirect(url_for("login")) # Globale Variablen für die Intros INTRO_FILE = "introduction.txt" ASKNOTES_INTRO_FILE = "asknotesintro.txt" # Speichern der Prozess-ID bot_process = None def bot_status(): """Überprüft, ob der Bot läuft.""" global bot_process if bot_process is None: return False return bot_process.poll() is None # None bedeutet, dass der Prozess noch läuft def start_bot(): """Startet den Bot.""" global bot_process if not bot_status(): bot_process = subprocess.Popen(["python", "bot.py"], cwd=os.path.dirname(os.path.abspath(__file__))) else: print("Bot läuft bereits.") def stop_bot(): """Stoppt den Bot.""" global bot_process if bot_process and bot_status(): bot_process.terminate() bot_process.wait() # Warten, bis der Prozess beendet ist bot_process = None else: print("Bot läuft nicht.") def load_text_file(file_path): """Lädt den Inhalt einer Textdatei.""" if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() return "" def save_text_file(file_path, content): """Speichert den Inhalt in einer Textdatei.""" with open(file_path, 'w', encoding='utf-8') as file: file.write(content) def get_db_connection(): """Stellt eine Verbindung zur MySQL-Datenbank her.""" return mysql.connector.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, database=DB_NAME ) @app.route("/") def index(): if "username" in session: return render_template("index.html", bot_running=bot_status()) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form["username"] password = request.form["password"] if username == os.getenv("ADMIN_USER") and password == os.getenv("ADMIN_PASS"): session["username"] = username return redirect(url_for("index")) else: return "Invalid credentials!" return render_template("login.html") @app.route("/logout") def logout(): session.pop("username", None) return redirect(url_for("login")) @app.route("/start_bot") def start(): if "username" in session: start_bot() return redirect(url_for("index")) return redirect(url_for("login")) @app.route("/stop_bot") def stop(): if "username" in session: stop_bot() return redirect(url_for("index")) return redirect(url_for("login")) @app.route("/settings", methods=["GET", "POST"]) def settings(): if "username" in session: if request.method == "POST": introduction = request.form.get("introduction") asknotes_introduction = request.form.get("asknotes_introduction") # Speichern der Intros save_text_file(INTRO_FILE, introduction) save_text_file(ASKNOTES_INTRO_FILE, asknotes_introduction) return redirect(url_for("settings")) # Laden der aktuellen Inhalte aus den Textdateien introduction = load_text_file(INTRO_FILE) asknotes_introduction = load_text_file(ASKNOTES_INTRO_FILE) return render_template("settings.html", introduction=introduction, asknotes_introduction=asknotes_introduction) return redirect(url_for("login")) @app.route("/users") def users(): """Zeigt eine Liste aller Benutzer an.""" if "username" in session: connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_id, permission, points, ban FROM user_data") users = cursor.fetchall() cursor.close() connection.close() return render_template("users.html", users=users) return redirect(url_for("login")) @app.route("/ban_user/") def ban_user(user_id): """Banned einen Benutzer.""" if "username" in session: connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET ban = 1 WHERE user_id = %s", (user_id,)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error banning user: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/update_points/", methods=["POST"]) def update_points(user_id): """Aktualisiert die Punkte eines Benutzers.""" if "username" in session: points_change = int(request.form["points_change"]) connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET points = points + %s WHERE user_id = %s", (points_change, user_id)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error updating points: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/unban_user/") def unban_user(user_id): """Entbannt einen Benutzer.""" if "username" in session: connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET ban = 0 WHERE user_id = %s", (user_id,)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error unbanning user: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/update_role/", methods=["POST"]) def update_role(user_id): """Aktualisiert die Rolle (Berechtigung) eines Benutzers.""" if "username" in session: new_permission = request.form["permission"] connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET permission = %s WHERE user_id = %s", (new_permission, user_id)) connection.commit() return redirect(url_for("users")) except Exception as e: print(f"Error updating role: {e}") connection.rollback() return redirect(url_for("users")) finally: cursor.close() connection.close() return redirect(url_for("login")) @app.route("/logs") def view_logs(): """Zeigt die Logs des Bots im Admin-Panel an.""" if "username" in session: return render_template("logs.html") return redirect(url_for("login")) @app.route("/get_logs") def get_logs(): """Liest den Inhalt der Log-Datei und gibt ihn zurück.""" if "username" in session: try: with open(LOG_FILE_PATH, 'r', encoding='utf-8') as file: logs = file.read() return jsonify({"logs": logs}) except FileNotFoundError: return jsonify({"logs": "Log file not found."}) return redirect(url_for("login")) def get_bot_statistics(): """Berechnet grundlegende Statistiken für den Bot.""" connection = get_db_connection() cursor = connection.cursor(dictionary=True) # Beispielabfragen, anpassen je nach Datenbankstruktur cursor.execute("SELECT COUNT(*) AS total_messages FROM chat_history") total_messages = cursor.fetchone()["total_messages"] cursor.execute(""" SELECT command_name, COUNT(*) AS usage_count FROM command_log GROUP BY command_name ORDER BY usage_count DESC LIMIT 1 """) most_used_command = cursor.fetchone()["command_name"] cursor.close() connection.close() return { "total_messages": total_messages, "most_used_command": most_used_command, } @app.route("/download_logs") def download_logs(): """Bietet die Log-Datei zum Download an.""" if "username" in session: return send_file(LOG_FILE_PATH, as_attachment=True) return redirect(url_for("login")) import shutil ARCHIVE_DIR = "archive_logs" if not os.path.exists(ARCHIVE_DIR): os.makedirs(ARCHIVE_DIR) @app.route("/archive_logs", methods=["POST"]) def archive_logs(): """Archiviert die aktuelle Log-Datei und beginnt eine neue Log-Datei.""" if "username" in session: if os.path.exists(LOG_FILE_PATH): timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') archive_file = os.path.join(ARCHIVE_DIR, f"log_{timestamp}.log") shutil.move(LOG_FILE_PATH, archive_file) with open(LOG_FILE_PATH, 'w', encoding='utf-8') as file: file.write("") # Neue leere Log-Datei starten return jsonify({"status": "success", "message": "Logs archived successfully."}) else: return jsonify({"status": "error", "message": "Log file not found."}) return redirect(url_for("login")) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)