__version__ = "dev-0.4.6" __all__ = ["Discordbot-chatai-webpanel (Discord)"] __author__ = "SimolZimol" from flask import Flask, render_template, redirect, url_for, request, session, jsonify, send_file, flash from requests_oauthlib import OAuth2Session import os import subprocess import psutil import mysql.connector from datetime import datetime from requests.exceptions import HTTPError app = Flask(__name__) app.secret_key = os.getenv("FLASK_SECRET_KEY") LOG_FILE_PATH = os.path.join("logs", f"{datetime.now().strftime('%Y-%m-%d')}.log") # Verwende Umgebungsvariablen aus Coolify für die Datenbankverbindung DB_HOST = os.getenv("DB_HOST") DB_PORT = os.getenv("DB_PORT") DB_USER = os.getenv("DB_USER") DB_PASS = os.getenv("DB_PASSWORD") DB_NAME = os.getenv("DB_DATABASE") DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID") DISCORD_CLIENT_SECRET = os.getenv("DISCORD_CLIENT_SECRET") DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI") DISCORD_OAUTH2_URL = "https://discord.com/api/oauth2/authorize" DISCORD_TOKEN_URL = "https://discord.com/api/oauth2/token" DISCORD_API_URL = "https://discord.com/api/users/@me" os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' # Globale Variablen für die Intros INTRO_FILE = "introduction.txt" ASKNOTES_INTRO_FILE = "asknotesintro.txt" # Speichern der Prozess-ID bot_process = None def bot_status(): """Überprüft, ob der Bot läuft.""" global bot_process if bot_process is None: return False return bot_process.poll() is None # None bedeutet, dass der Prozess noch läuft def start_bot(): """Startet den Bot.""" global bot_process if not bot_status(): bot_process = subprocess.Popen(["python", "bot.py"], cwd=os.path.dirname(os.path.abspath(__file__))) else: print("Bot läuft bereits.") def stop_bot(): """Stoppt den Bot.""" global bot_process if bot_process and bot_status(): bot_process.terminate() bot_process.wait() # Warten, bis der Prozess beendet ist bot_process = None else: print("Bot läuft nicht.") def load_text_file(file_path): """Lädt den Inhalt einer Textdatei.""" if os.path.exists(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() return "" def save_text_file(file_path, content): """Speichert den Inhalt in einer Textdatei.""" with open(file_path, 'w', encoding='utf-8') as file: file.write(content) def get_db_connection(): """Stellt eine Verbindung zur MySQL-Datenbank her.""" return mysql.connector.connect( host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS, database=DB_NAME ) def make_discord_session(token=None, state=None): return OAuth2Session( DISCORD_CLIENT_ID, token=token, state=state, redirect_uri=DISCORD_REDIRECT_URI, scope=["identify", "guilds"] ) def is_admin(): """Überprüft, ob der Benutzer Admin-Rechte in der ausgewählten Guild hat.""" if "discord_user" in session and "selected_guild_id" in session: user_info = session["discord_user"] user_id = user_info["id"] guild_id = session["selected_guild_id"] # Überprüfe die Admin-Rechte des Benutzers in der ausgewählten Guild connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT permission FROM user_data WHERE user_id = %s AND guild_id = %s", (user_id, guild_id)) user_data = cursor.fetchone() cursor.close() connection.close() if user_data and user_data["permission"] >= 8: return True return False @app.route("/") def landing_page(): """Ungeschützte Landing Page""" return render_template("landing.html") @app.route("/login") def login(): """Startet den Discord-OAuth2-Flow.""" discord = make_discord_session() authorization_url, state = discord.authorization_url(DISCORD_OAUTH2_URL) session['oauth_state'] = state return redirect(authorization_url) @app.route("/about") def about(): """Zeigt eine öffentliche Über uns Seite an.""" return render_template("about.html") @app.route("/contact") def contact(): """Zeigt eine öffentliche Kontaktseite an.""" return render_template("contact.html") @app.route("/faq") def faq(): """Zeigt eine öffentliche FAQ Seite an.""" return render_template("faq.html") @app.route("/help") def help_page(): """Zeigt eine öffentliche Hilfeseite an.""" return render_template("help.html") @app.route("/callback") def callback(): """Verarbeitet den OAuth2-Rückruf von Discord.""" discord = make_discord_session(state=session.get("oauth_state")) try: token = discord.fetch_token( DISCORD_TOKEN_URL, client_secret=DISCORD_CLIENT_SECRET, authorization_response=request.url, ) except HTTPError as e: flash("OAuth2-Authentifizierung fehlgeschlagen.", "danger") return redirect(url_for("landing_page")) session['oauth_token'] = token # User-Informationen von Discord abrufen user_info = discord.get(DISCORD_API_URL).json() # Speichere die Benutzerinformationen in der Session session['discord_user'] = user_info # Holen der Guilds, denen der Benutzer angehört guilds_response = discord.get("https://discord.com/api/users/@me/guilds") guilds = guilds_response.json() # Speichern der Guilds in der Session session['discord_guilds'] = guilds return redirect(url_for("select_guild")) @app.route("/select_guild", methods=["GET", "POST"]) def select_guild(): """Lässt den Benutzer eine Guild auswählen, die er verwalten möchte.""" if "discord_user" in session and "discord_guilds" in session: guilds = session["discord_guilds"] if request.method == "POST": selected_guild_id = request.form.get("guild_id") # Überprüfen, ob die ausgewählte Guild in der Liste der Guilds des Benutzers ist if any(str(guild["id"]) == selected_guild_id for guild in guilds): session["selected_guild_id"] = int(selected_guild_id) return redirect(url_for("admin_dashboard")) else: flash("Ungültige Guild ausgewählt.", "danger") return render_template("select_guild.html", guilds=guilds) return redirect(url_for("landing_page")) @app.route("/admin_dashboard") def admin_dashboard(): """Zeigt das Admin-Dashboard an (nur für Admins) für die ausgewählte Guild.""" if "discord_user" in session and "selected_guild_id" in session: user_info = session["discord_user"] user_id = user_info["id"] guild_id = session["selected_guild_id"] # Überprüfe, ob der Benutzer Admin-Rechte hat connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT permission FROM user_data WHERE user_id = %s AND guild_id = %s", (user_id, guild_id)) user_data = cursor.fetchone() cursor.close() connection.close() if user_data and user_data["permission"] >= 8: return render_template("admin_dashboard.html", user_info=user_info, bot_running=bot_status(), guild_id=guild_id) else: return redirect(url_for("user_dashboard")) return redirect(url_for("landing_page")) @app.route("/user_dashboard") def user_dashboard(): """Zeigt das User-Dashboard an für die ausgewählte Guild.""" if "discord_user" in session and "selected_guild_id" in session: user_info = session["discord_user"] user_id = user_info["id"] guild_id = session["selected_guild_id"] connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT points, permission, ban FROM user_data WHERE user_id = %s AND guild_id = %s", (user_id, guild_id)) user_data = cursor.fetchone() cursor.close() connection.close() if user_data: return render_template("user_dashboard.html", user_info=user_info, user_data=user_data, bot_running=bot_status(), guild_id=guild_id) else: return "User data not found", 404 return redirect(url_for("landing_page")) @app.route("/logout") def logout(): """Löscht die Benutzersitzung und meldet den Benutzer ab.""" session.pop('discord_user', None) session.pop('oauth_token', None) session.pop('discord_guilds', None) session.pop('selected_guild_id', None) return redirect(url_for('landing_page')) @app.route("/start_bot") def start(): if is_admin(): start_bot() user_info = session["discord_user"] guild_id = session["selected_guild_id"] return render_template("admin_dashboard.html", user_info=user_info, bot_running=bot_status(), guild_id=guild_id) return redirect(url_for("landing_page")) @app.route("/stop_bot") def stop(): if is_admin(): stop_bot() user_info = session["discord_user"] guild_id = session["selected_guild_id"] return render_template("admin_dashboard.html", user_info=user_info, bot_running=bot_status(), guild_id=guild_id) return redirect(url_for("landing_page")) @app.route("/settings", methods=["GET", "POST"]) def settings(): if is_admin() and "selected_guild_id" in session: if request.method == "POST": introduction = request.form.get("introduction") asknotes_introduction = request.form.get("asknotes_introduction") guild_id = session["selected_guild_id"] # Speichern der Intros in der Datenbank connection = get_db_connection() cursor = connection.cursor() try: cursor.execute(""" UPDATE guild_settings SET introduction = %s, asknotes_introduction = %s WHERE guild_id = %s """, (introduction, asknotes_introduction, guild_id)) connection.commit() flash("Einstellungen erfolgreich aktualisiert!", "success") except Exception as e: print(f"Error updating settings: {e}") connection.rollback() flash("Fehler beim Aktualisieren der Einstellungen.", "danger") finally: cursor.close() connection.close() return redirect(url_for("settings")) # Laden der aktuellen Einstellungen aus der Datenbank guild_id = session["selected_guild_id"] connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT introduction, asknotes_introduction FROM guild_settings WHERE guild_id = %s", (guild_id,)) settings = cursor.fetchone() cursor.close() connection.close() introduction = settings["introduction"] if settings else "" asknotes_introduction = settings["asknotes_introduction"] if settings else "" return render_template("settings.html", introduction=introduction, asknotes_introduction=asknotes_introduction) return redirect(url_for("landing_page")) @app.route("/users") def users(): """Zeigt eine Liste aller Benutzer in der ausgewählten Guild an.""" if is_admin() and "selected_guild_id" in session: guild_id = session["selected_guild_id"] connection = get_db_connection() cursor = connection.cursor(dictionary=True) cursor.execute("SELECT user_id, permission, points, ban FROM user_data WHERE guild_id = %s", (guild_id,)) users = cursor.fetchall() cursor.close() connection.close() return render_template("users.html", users=users, guild_id=guild_id) return redirect(url_for("landing_page")) @app.route("/ban_user/") def ban_user(user_id): """Bannt einen Benutzer in der ausgewählten Guild.""" if is_admin() and "selected_guild_id" in session: guild_id = session["selected_guild_id"] connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET ban = 1 WHERE user_id = %s AND guild_id = %s", (user_id, guild_id)) connection.commit() flash("Benutzer erfolgreich gebannt.", "success") except Exception as e: print(f"Error banning user: {e}") connection.rollback() flash("Fehler beim Bannen des Benutzers.", "danger") finally: cursor.close() connection.close() return redirect(url_for("users")) return redirect(url_for("landing_page")) @app.route("/unban_user/") def unban_user(user_id): """Entbannt einen Benutzer in der ausgewählten Guild.""" if is_admin() and "selected_guild_id" in session: guild_id = session["selected_guild_id"] connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET ban = 0 WHERE user_id = %s AND guild_id = %s", (user_id, guild_id)) connection.commit() flash("Benutzer erfolgreich entbannt.", "success") except Exception as e: print(f"Error unbanning user: {e}") connection.rollback() flash("Fehler beim Entbannen des Benutzers.", "danger") finally: cursor.close() connection.close() return redirect(url_for("users")) return redirect(url_for("landing_page")) @app.route("/update_points/", methods=["POST"]) def update_points(user_id): """Aktualisiert die Punkte eines Benutzers in der ausgewählten Guild.""" if is_admin() and "selected_guild_id" in session: points_change = int(request.form["points_change"]) guild_id = session["selected_guild_id"] connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET points = points + %s WHERE user_id = %s AND guild_id = %s", (points_change, user_id, guild_id)) connection.commit() flash("Punkte erfolgreich aktualisiert.", "success") except Exception as e: print(f"Error updating points: {e}") connection.rollback() flash("Fehler beim Aktualisieren der Punkte.", "danger") finally: cursor.close() connection.close() return redirect(url_for("users")) return redirect(url_for("landing_page")) @app.route("/update_role/", methods=["POST"]) def update_role(user_id): """Aktualisiert die Rolle (Berechtigung) eines Benutzers in der ausgewählten Guild.""" if is_admin() and "selected_guild_id" in session: new_permission = int(request.form["permission"]) guild_id = session["selected_guild_id"] connection = get_db_connection() cursor = connection.cursor() try: cursor.execute("UPDATE user_data SET permission = %s WHERE user_id = %s AND guild_id = %s", (new_permission, user_id, guild_id)) connection.commit() flash("Rolle erfolgreich aktualisiert.", "success") except Exception as e: print(f"Error updating role: {e}") connection.rollback() flash("Fehler beim Aktualisieren der Rolle.", "danger") finally: cursor.close() connection.close() return redirect(url_for("users")) return redirect(url_for("landing_page")) @app.route("/logs") def view_logs(): """Zeigt die Logs des Bots im Admin-Panel an.""" if is_admin(): return render_template("logs.html") return redirect(url_for("landing_page")) @app.route("/get_logs") def get_logs(): """Liest den Inhalt der Log-Datei und gibt ihn zurück.""" if is_admin(): try: with open(LOG_FILE_PATH, 'r', encoding='utf-8') as file: logs = file.read() return jsonify({"logs": logs}) except FileNotFoundError: return jsonify({"logs": "Log file not found."}) return redirect(url_for("landing_page")) @app.route("/download_logs") def download_logs(): """Bietet die Log-Datei zum Download an.""" if is_admin(): return send_file(LOG_FILE_PATH, as_attachment=True) return redirect(url_for("landing_page")) @app.route("/admin/giveaways", methods=["GET", "POST"]) def admin_giveaways(): """Zeigt eine Liste aller Giveaways an und ermöglicht das Bearbeiten und Sortieren in der ausgewählten Guild.""" if is_admin() and "selected_guild_id" in session: guild_id = session["selected_guild_id"] connection = get_db_connection() # Verbindung zur Giveaway-Datenbank cursor = connection.cursor(dictionary=True) # Sortierung nach bestimmten Feldern sort_field = request.args.get("sort", "id") # Standardmäßig nach 'id' sortieren order = request.args.get("order", "asc") # Standardmäßig aufsteigend sortieren # Holen aller Giveaways aus der Datenbank für die ausgewählte Guild cursor.execute(f"SELECT * FROM giveaway_data WHERE guild_id = %s ORDER BY {sort_field} {order}", (guild_id,)) giveaways = cursor.fetchall() cursor.close() connection.close() return render_template("admin_giveaways.html", giveaways=giveaways, sort_field=sort_field, order=order, guild_id=guild_id) return redirect(url_for("login")) @app.route("/admin/giveaways/edit/", methods=["GET", "POST"]) def edit_giveaway(giveaway_id): """Bearbeitet ein spezifisches Giveaway in der ausgewählten Guild.""" if is_admin() and "selected_guild_id" in session: guild_id = session["selected_guild_id"] connection = get_db_connection() # Verbindung zur Giveaway-Datenbank cursor = connection.cursor(dictionary=True) if request.method == "POST": platform = request.form.get("platform") name = request.form.get("name") game_key = request.form.get("game_key") winner_dc_id = request.form.get("winner_dc_id") aktiv = bool(request.form.get("aktiv")) # Update der Giveaways-Daten für die ausgewählte Guild cursor.execute(""" UPDATE giveaway_data SET platform = %s, name = %s, game_key = %s, winner_dc_id = %s, aktiv = %s WHERE id = %s AND guild_id = %s """, (platform, name, game_key, winner_dc_id, aktiv, giveaway_id, guild_id)) connection.commit() flash("Giveaway erfolgreich aktualisiert!", "success") cursor.close() connection.close() return redirect(url_for("admin_giveaways")) # Daten des spezifischen Giveaways laden, nur für die ausgewählte Guild cursor.execute("SELECT * FROM giveaway_data WHERE id = %s AND guild_id = %s", (giveaway_id, guild_id)) giveaway = cursor.fetchone() cursor.close() connection.close() if not giveaway: flash("Giveaway nicht gefunden oder Zugriff verweigert.", "danger") return redirect(url_for("admin_giveaways")) return render_template("edit_giveaway.html", giveaway=giveaway, guild_id=guild_id) return redirect(url_for("login")) @app.route("/user/giveaways", methods=["GET"]) def user_giveaways(): """Zeigt dem Benutzer die Giveaways, die er in der ausgewählten Guild gewonnen hat.""" if "discord_user" in session and "selected_guild_id" in session: user_info = session["discord_user"] user_id = user_info["id"] guild_id = session["selected_guild_id"] connection = get_db_connection() # Verbindung zur Giveaway-Datenbank cursor = connection.cursor(dictionary=True) # Suche nach Giveaways, bei denen der eingeloggte Benutzer der Gewinner ist, in der ausgewählten Guild cursor.execute(""" SELECT * FROM giveaway_data WHERE winner_dc_id = %s AND guild_id = %s """, (user_id, guild_id)) won_giveaways = cursor.fetchall() cursor.close() connection.close() return render_template("user_giveaways.html", user_info=user_info, giveaways=won_giveaways, guild_id=guild_id) return redirect(url_for("login")) @app.route("/user/giveaway/redeem/", methods=["GET", "POST"]) def redeem_giveaway(uuid): """Erlaubt dem Benutzer, den Giveaway-Code abzurufen in der ausgewählten Guild.""" if "discord_user" in session and "selected_guild_id" in session: user_info = session["discord_user"] user_id = user_info["id"] guild_id = session["selected_guild_id"] connection = get_db_connection() # Verbindung zur Giveaway-Datenbank cursor = connection.cursor(dictionary=True) # Überprüfen, ob der eingeloggte Benutzer der Gewinner ist und die Guild stimmt cursor.execute("SELECT * FROM giveaway_data WHERE uuid = %s AND winner_dc_id = %s AND guild_id = %s", (uuid, user_id, guild_id)) giveaway = cursor.fetchone() if giveaway: if request.method == "POST": # Wenn der Benutzer den Key aufdeckt, setze `aktiv` auf TRUE cursor.execute("UPDATE giveaway_data SET aktiv = TRUE WHERE uuid = %s AND guild_id = %s", (uuid, guild_id)) connection.commit() # Key aufdecken key = giveaway["game_key"] cursor.close() connection.close() return render_template("redeem_giveaway.html", giveaway=giveaway, key=key) # Zeige die Seite mit dem Button an, um den Key aufzudecken cursor.close() connection.close() return render_template("redeem_giveaway.html", giveaway=giveaway, key=None) else: flash("Du bist nicht der Gewinner dieses Giveaways oder das Giveaway ist nicht mehr verfügbar.", "danger") cursor.close() connection.close() return redirect(url_for("user_giveaways")) return redirect(url_for("login")) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)