Files
Discord-ai-chatbot/app.py
2024-09-12 16:33:09 +02:00

540 lines
18 KiB
Python

__version__ = "dev-0.4.6"
__all__ = ["Discordbot-chatai-webpanel (Discord)"]
__author__ = "SimolZimol"
from flask import Flask, render_template, redirect, url_for, request, session, jsonify, send_file, flash
from requests_oauthlib import OAuth2Session
import os
import subprocess
import psutil
import mysql.connector
from datetime import datetime
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY")
LOG_FILE_PATH = os.path.join("logs", f"{datetime.now().strftime('%Y-%m-%d')}.log")
# Verwende Umgebungsvariablen aus Coolify für die Datenbankverbindung
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_DATABASE")
GIVEAWAY_DB_HOST = os.getenv("GIVEAWAY_DB_HOST")
GIVEAWAY_DB_PORT = os.getenv("GIVEAWAY_DB_PORT")
GIVEAWAY_DB_USER = os.getenv("GIVEAWAY_DB_USER")
GIVEAWAY_DB_PASSWORD = os.getenv("GIVEAWAY_DB_PASSWORD")
GIVEAWAY_DB_DATABASE = os.getenv("GIVEAWAY_DB_DATABASE")
DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID")
DISCORD_CLIENT_SECRET = os.getenv("DISCORD_CLIENT_SECRET")
DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI")
DISCORD_OAUTH2_URL = "https://discord.com/api/oauth2/authorize"
DISCORD_TOKEN_URL = "https://discord.com/api/oauth2/token"
DISCORD_API_URL = "https://discord.com/api/users/@me"
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
# Globale Variablen für die Intros
INTRO_FILE = "introduction.txt"
ASKNOTES_INTRO_FILE = "asknotesintro.txt"
# Speichern der Prozess-ID
bot_process = None
def get_giveaway_db_connection():
"""Erstellt eine Verbindung zur Giveaway-Datenbank."""
connection = mysql.connector.connect(
host=os.getenv("GIVEAWAY_DB_HOST"),
port=os.getenv("GIVEAWAY_DB_PORT"),
user=os.getenv("GIVEAWAY_DB_USER"),
password=os.getenv("GIVEAWAY_DB_PASSWORD"),
database=os.getenv("GIVEAWAY_DB_DATABASE")
)
return connection
def bot_status():
"""Überprüft, ob der Bot läuft."""
global bot_process
if bot_process is None:
return False
return bot_process.poll() is None # None bedeutet, dass der Prozess noch läuft
def start_bot():
"""Startet den Bot."""
global bot_process
if not bot_status():
bot_process = subprocess.Popen(["python", "bot.py"], cwd=os.path.dirname(os.path.abspath(__file__)))
else:
print("Bot läuft bereits.")
def stop_bot():
"""Stoppt den Bot."""
global bot_process
if bot_process and bot_status():
bot_process.terminate()
bot_process.wait() # Warten, bis der Prozess beendet ist
bot_process = None
else:
print("Bot läuft nicht.")
def load_text_file(file_path):
"""Lädt den Inhalt einer Textdatei."""
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
return ""
def save_text_file(file_path, content):
"""Speichert den Inhalt in einer Textdatei."""
with open(file_path, 'w', encoding='utf-8') as file:
file.write(content)
def get_db_connection():
"""Stellt eine Verbindung zur MySQL-Datenbank her."""
return mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME
)
def make_discord_session(token=None, state=None):
return OAuth2Session(
DISCORD_CLIENT_ID,
token=token,
state=state,
redirect_uri=DISCORD_REDIRECT_URI,
scope=["identify"]
)
def is_admin():
"""Überprüft, ob der Benutzer Admin-Rechte hat."""
if "discord_user" in session:
user_info = session["discord_user"]
user_id = user_info["id"]
# Überprüfe die Admin-Rechte des Benutzers
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT permission FROM user_data WHERE user_id = %s", (user_id,))
user_data = cursor.fetchone()
cursor.close()
connection.close()
if user_data and user_data["permission"] >= 8:
return True
return False
@app.route("/")
def landing_page():
"""Ungeschützte Landing Page"""
return render_template("landing.html")
@app.route("/login")
def login():
"""Startet den Discord-OAuth2-Flow."""
discord = make_discord_session()
authorization_url, state = discord.authorization_url(DISCORD_OAUTH2_URL)
session['oauth_state'] = state
return redirect(authorization_url)
@app.route("/about")
def about():
"""Zeigt eine öffentliche Über uns Seite an."""
return render_template("about.html")
@app.route("/contact")
def contact():
"""Zeigt eine öffentliche Kontaktseite an."""
return render_template("contact.html")
@app.route("/faq")
def faq():
"""Zeigt eine öffentliche FAQ Seite an."""
return render_template("faq.html")
@app.route("/help")
def help_page():
"""Zeigt eine öffentliche Hilfeseite an."""
return render_template("help.html")
@app.route("/callback")
def callback():
"""Verarbeitet den OAuth2-Rückruf von Discord."""
discord = make_discord_session(state=session.get("oauth_state"))
token = discord.fetch_token(
DISCORD_TOKEN_URL,
client_secret=DISCORD_CLIENT_SECRET,
authorization_response=request.url,
)
session['oauth_token'] = token
# User-Informationen von Discord abrufen
user_info = discord.get(DISCORD_API_URL).json()
# Speichere die Benutzerinformationen in der Session
session['discord_user'] = user_info
# Hole Benutzerrollen und andere Daten aus der Datenbank
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT permission FROM user_data WHERE user_id = %s", (user_info["id"],))
user_data = cursor.fetchone()
cursor.close()
connection.close()
# Weiterleiten basierend auf den Berechtigungen
if user_data and user_data["permission"] >= 8:
return redirect(url_for("admin_dashboard"))
else:
return redirect(url_for("user_dashboard"))
@app.route("/admin_dashboard")
def admin_dashboard():
"""Zeigt das Admin-Dashboard an (nur für Admins)."""
if "discord_user" in session:
user_info = session["discord_user"]
user_id = user_info["id"]
# Überprüfe, ob der Benutzer Admin-Rechte hat
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT permission FROM user_data WHERE user_id = %s", (user_id,))
user_data = cursor.fetchone()
cursor.close()
connection.close()
if user_data and user_data["permission"] >= 8:
bot_running=bot_status()
return render_template("admin_dashboard.html", user_info=user_info, bot_running=bot_running)
else:
return redirect(url_for("user_dashboard"))
return redirect(url_for("landing_page"))
@app.route("/user_dashboard")
def user_dashboard():
"""Zeigt das User-Dashboard an."""
if "discord_user" in session:
user_info = session["discord_user"]
user_id = user_info["id"]
# Verbindung zur Datenbank herstellen
connection = get_db_connection() # Stelle sicher, dass du die richtige Verbindung benutzt
cursor = connection.cursor(dictionary=True)
# Punkte und Rang aus der Datenbank abrufen
cursor.execute("SELECT points, rank FROM user_data WHERE user_id = %s", (user_id,))
user_data = cursor.fetchone()
cursor.close()
connection.close()
# Wenn der Benutzer existiert, werden die Daten an das Template übergeben
if user_data:
user_points = user_data["points"]
user_rank = user_data["rank"]
else:
# Falls keine Daten gefunden wurden, setze Standardwerte
user_points = 0
user_rank = "User"
return render_template("user_dashboard.html", user_info=user_info, user_points=user_points, user_rank=user_rank)
return redirect(url_for("login"))
@app.route("/logout")
def logout():
"""Löscht die Benutzersitzung und meldet den Benutzer ab."""
session.pop('discord_user', None)
session.pop('oauth_token', None)
return redirect(url_for('landing_page'))
@app.route("/start_bot")
def start():
if is_admin():
start_bot()
user_info = session["discord_user"]
return render_template("admin_dashboard.html", user_info=user_info, bot_running=bot_status())
return redirect(url_for("landing_page"))
@app.route("/stop_bot")
def stop():
if is_admin():
stop_bot()
user_info = session["discord_user"]
return render_template("admin_dashboard.html", user_info=user_info, bot_running=bot_status())
return redirect(url_for("landing_page"))
@app.route("/settings", methods=["GET", "POST"])
def settings():
if is_admin():
if request.method == "POST":
introduction = request.form.get("introduction")
asknotes_introduction = request.form.get("asknotes_introduction")
# Speichern der Intros
save_text_file(INTRO_FILE, introduction)
save_text_file(ASKNOTES_INTRO_FILE, asknotes_introduction)
return redirect(url_for("settings"))
# Laden der aktuellen Inhalte aus den Textdateien
introduction = load_text_file(INTRO_FILE)
asknotes_introduction = load_text_file(ASKNOTES_INTRO_FILE)
return render_template("settings.html", introduction=introduction, asknotes_introduction=asknotes_introduction)
return redirect(url_for("landing_page"))
@app.route("/users")
def users():
"""Zeigt eine Liste aller Benutzer an."""
if is_admin():
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute("SELECT user_id, permission, points, ban FROM user_data")
users = cursor.fetchall()
cursor.close()
connection.close()
return render_template("users.html", users=users)
return redirect(url_for("landing_page"))
@app.route("/ban_user/<int:user_id>")
def ban_user(user_id):
"""Banned einen Benutzer."""
if is_admin():
connection = get_db_connection()
cursor = connection.cursor()
try:
cursor.execute("UPDATE user_data SET ban = 1 WHERE user_id = %s", (user_id,))
connection.commit()
return redirect(url_for("users"))
except Exception as e:
print(f"Error banning user: {e}")
connection.rollback()
return redirect(url_for("users"))
finally:
cursor.close()
connection.close()
return redirect(url_for("landing_page"))
@app.route("/update_points/<int:user_id>", methods=["POST"])
def update_points(user_id):
"""Aktualisiert die Punkte eines Benutzers."""
if is_admin():
points_change = int(request.form["points_change"])
connection = get_db_connection()
cursor = connection.cursor()
try:
cursor.execute("UPDATE user_data SET points = points + %s WHERE user_id = %s", (points_change, user_id))
connection.commit()
return redirect(url_for("users"))
except Exception as e:
print(f"Error updating points: {e}")
connection.rollback()
return redirect(url_for("users"))
finally:
cursor.close()
connection.close()
return redirect(url_for("landing_page"))
@app.route("/unban_user/<int:user_id>")
def unban_user(user_id):
"""Entbannt einen Benutzer."""
if is_admin():
connection = get_db_connection()
cursor = connection.cursor()
try:
cursor.execute("UPDATE user_data SET ban = 0 WHERE user_id = %s", (user_id,))
connection.commit()
return redirect(url_for("users"))
except Exception as e:
print(f"Error unbanning user: {e}")
connection.rollback()
return redirect(url_for("users"))
finally:
cursor.close()
connection.close()
return redirect(url_for("landing_page"))
@app.route("/update_role/<int:user_id>", methods=["POST"])
def update_role(user_id):
"""Aktualisiert die Rolle (Berechtigung) eines Benutzers."""
if is_admin():
new_permission = request.form["permission"]
connection = get_db_connection()
cursor = connection.cursor()
try:
cursor.execute("UPDATE user_data SET permission = %s WHERE user_id = %s", (new_permission, user_id))
connection.commit()
return redirect(url_for("users"))
except Exception as e:
print(f"Error updating role: {e}")
connection.rollback()
return redirect(url_for("users"))
finally:
cursor.close()
connection.close()
return redirect(url_for("landing_page"))
@app.route("/logs")
def view_logs():
"""Zeigt die Logs des Bots im Admin-Panel an."""
if is_admin():
return render_template("logs.html")
return redirect(url_for("landing_page"))
@app.route("/get_logs")
def get_logs():
"""Liest den Inhalt der Log-Datei und gibt ihn zurück."""
if is_admin():
try:
with open(LOG_FILE_PATH, 'r', encoding='utf-8') as file:
logs = file.read()
return jsonify({"logs": logs})
except FileNotFoundError:
return jsonify({"logs": "Log file not found."})
return redirect(url_for("landing_page"))
@app.route("/download_logs")
def download_logs():
"""Bietet die Log-Datei zum Download an."""
if is_admin():
return send_file(LOG_FILE_PATH, as_attachment=True)
return redirect(url_for("landing_page"))
@app.route("/admin/giveaways", methods=["GET", "POST"])
def admin_giveaways():
"""Zeigt eine Liste aller Giveaways an und ermöglicht das Bearbeiten und Sortieren."""
if is_admin():
connection = get_giveaway_db_connection() # Verbindung zur Giveaway-Datenbank
cursor = connection.cursor(dictionary=True)
# Sortierung nach bestimmten Feldern
sort_field = request.args.get("sort", "id") # Standardmäßig nach 'id' sortieren
order = request.args.get("order", "asc") # Standardmäßig aufsteigend sortieren
# Holen aller Giveaways aus der Datenbank
cursor.execute(f"SELECT * FROM giveaways ORDER BY {sort_field} {order}")
giveaways = cursor.fetchall()
cursor.close()
connection.close()
return render_template("admin_giveaways.html", giveaways=giveaways, sort_field=sort_field, order=order)
return redirect(url_for("login"))
@app.route("/admin/giveaways/edit/<int:giveaway_id>", methods=["GET", "POST"])
def edit_giveaway(giveaway_id):
"""Bearbeitet ein spezifisches Giveaway."""
if is_admin():
connection = get_giveaway_db_connection() # Verbindung zur Giveaway-Datenbank
cursor = connection.cursor(dictionary=True)
if request.method == "POST":
platform = request.form.get("platform")
name = request.form.get("name")
game_key = request.form.get("game_key")
winner_dc_id = request.form.get("winner_dc_id")
aktiv = bool(request.form.get("aktiv"))
# Update der Giveaways-Daten
cursor.execute("""
UPDATE giveaways
SET platform = %s, name = %s, game_key = %s, winner_dc_id = %s, aktiv = %s
WHERE id = %s
""", (platform, name, game_key, winner_dc_id, aktiv, giveaway_id))
connection.commit()
flash("Giveaway updated successfully!", "success")
return redirect(url_for("admin_giveaways"))
# Daten des spezifischen Giveaways laden
cursor.execute("SELECT * FROM giveaways WHERE id = %s", (giveaway_id,))
giveaway = cursor.fetchone()
cursor.close()
connection.close()
return render_template("edit_giveaway.html", giveaway=giveaway)
return redirect(url_for("login"))
@app.route("/user/giveaways", methods=["GET"])
def user_giveaways():
"""Zeigt dem Benutzer die Giveaways, die er gewonnen hat."""
if "discord_user" in session:
user_info = session["discord_user"]
user_id = user_info["id"]
connection = get_giveaway_db_connection() # Verbindung zur Giveaway-Datenbank
cursor = connection.cursor(dictionary=True)
# Suche nach Giveaways, bei denen der eingeloggte Benutzer der Gewinner ist
cursor.execute("""
SELECT * FROM giveaways WHERE winner_dc_id = %s
""", (user_id,))
won_giveaways = cursor.fetchall()
cursor.close()
connection.close()
return render_template("user_giveaways.html", user_info=user_info, giveaways=won_giveaways)
return redirect(url_for("login"))
@app.route("/user/giveaway/redeem/<uuid>", methods=["GET", "POST"])
def redeem_giveaway(uuid):
"""Erlaubt dem Benutzer, den Giveaway-Code abzurufen."""
if "discord_user" in session:
user_info = session["discord_user"]
user_id = user_info["id"]
connection = get_giveaway_db_connection() # Verbindung zur Giveaway-Datenbank
cursor = connection.cursor(dictionary=True)
# Überprüfen, ob der eingeloggte Benutzer der Gewinner ist
cursor.execute("SELECT * FROM giveaways WHERE uuid = %s AND winner_dc_id = %s", (uuid, user_id))
giveaway = cursor.fetchone()
if giveaway:
if request.method == "POST":
# Wenn der Benutzer den Key aufdeckt, setze `aktiv` auf TRUE
cursor.execute("UPDATE giveaways SET aktiv = TRUE WHERE uuid = %s", (uuid,))
connection.commit()
# Key aufdecken
return render_template("redeem_giveaway.html", giveaway=giveaway, key=giveaway["game_key"])
# Zeige die Seite mit dem Button an, um den Key aufzudecken
return render_template("redeem_giveaway.html", giveaway=giveaway, key=None)
else:
flash("You are not the winner of this giveaway or the giveaway is no longer available.", "danger")
cursor.close()
connection.close()
return redirect(url_for("user_giveaways"))
return redirect(url_for("login"))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)