313 lines
10 KiB
Python
313 lines
10 KiB
Python
__version__ = "dev-0.4.6"
|
|
__all__ = ["Discordbot-chatai-webpanel (Discord)"]
|
|
__author__ = "SimolZimol"
|
|
|
|
from flask import Flask, render_template, redirect, url_for, request, session, jsonify, send_file, flash
|
|
from requests_oauthlib import OAuth2Session
|
|
import os
|
|
import subprocess
|
|
import psutil
|
|
import mysql.connector
|
|
from datetime import datetime
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.getenv("FLASK_SECRET_KEY")
|
|
|
|
LOG_FILE_PATH = os.path.join("logs", f"{datetime.now().strftime('%Y-%m-%d')}.log")
|
|
|
|
# Verwende Umgebungsvariablen für die Datenbankverbindung
|
|
DB_HOST = os.getenv("DB_HOST")
|
|
DB_PORT = os.getenv("DB_PORT")
|
|
DB_USER = os.getenv("DB_USER")
|
|
DB_PASS = os.getenv("DB_PASSWORD")
|
|
DB_NAME = os.getenv("DB_DATABASE")
|
|
|
|
DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID")
|
|
DISCORD_CLIENT_SECRET = os.getenv("DISCORD_CLIENT_SECRET")
|
|
DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI")
|
|
DISCORD_OAUTH2_URL = "https://discord.com/api/oauth2/authorize"
|
|
DISCORD_TOKEN_URL = "https://discord.com/api/oauth2/token"
|
|
DISCORD_API_URL = "https://discord.com/api/users/@me"
|
|
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
|
|
|
|
# Speichern der Prozess-ID
|
|
bot_process = None
|
|
|
|
def bot_status():
|
|
"""Überprüft, ob der Bot läuft."""
|
|
global bot_process
|
|
if bot_process is None:
|
|
return False
|
|
return bot_process.poll() is None # None bedeutet, dass der Prozess noch läuft
|
|
|
|
def start_bot():
|
|
"""Startet den Bot."""
|
|
global bot_process
|
|
if not bot_status():
|
|
bot_process = subprocess.Popen(["python", "bot.py"], cwd=os.path.dirname(os.path.abspath(__file__)))
|
|
else:
|
|
print("Bot läuft bereits.")
|
|
|
|
def stop_bot():
|
|
"""Stoppt den Bot."""
|
|
global bot_process
|
|
if bot_process and bot_status():
|
|
bot_process.terminate()
|
|
bot_process.wait() # Warten, bis der Prozess beendet ist
|
|
bot_process = None
|
|
else:
|
|
print("Bot läuft nicht.")
|
|
|
|
def get_db_connection():
|
|
"""Stellt eine Verbindung zur MySQL-Datenbank her."""
|
|
return mysql.connector.connect(
|
|
host=DB_HOST,
|
|
port=DB_PORT,
|
|
user=DB_USER,
|
|
password=DB_PASS,
|
|
database=DB_NAME
|
|
)
|
|
|
|
def token_updater(token):
|
|
session['oauth_token'] = token
|
|
|
|
def make_discord_session(token=None, state=None):
|
|
return OAuth2Session(
|
|
DISCORD_CLIENT_ID,
|
|
token=token or session.get("oauth_token"),
|
|
state=state,
|
|
redirect_uri=DISCORD_REDIRECT_URI,
|
|
scope=["identify", "guilds"],
|
|
auto_refresh_kwargs={
|
|
'client_id': DISCORD_CLIENT_ID,
|
|
'client_secret': DISCORD_CLIENT_SECRET,
|
|
},
|
|
auto_refresh_url=DISCORD_TOKEN_URL,
|
|
token_updater=token_updater
|
|
)
|
|
|
|
|
|
|
|
def is_bot_admin():
|
|
"""Überprüft, ob der Benutzer globale Admin-Rechte hat."""
|
|
if "discord_user" in session:
|
|
user_info = session["discord_user"]
|
|
user_id = user_info["id"]
|
|
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT global_permission FROM bot_data WHERE user_id = %s", (user_id,))
|
|
user_data = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
return user_data and user_data["global_permission"] >= 8
|
|
return False
|
|
|
|
def is_server_admin(guild_id):
|
|
"""Überprüft, ob der Benutzer Admin-Rechte auf einem bestimmten Server (Guild) hat."""
|
|
if "discord_user" in session:
|
|
user_info = session["discord_user"]
|
|
user_id = user_info["id"]
|
|
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT permission FROM user_data WHERE user_id = %s AND guild_id = %s", (user_id, guild_id))
|
|
user_data = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
return user_data and user_data["permission"] >= 8
|
|
return False
|
|
|
|
@app.route("/")
|
|
def landing_page():
|
|
"""Landing Page"""
|
|
return render_template("landing.html")
|
|
|
|
@app.route("/login")
|
|
def login():
|
|
"""Startet den Discord-OAuth2-Flow."""
|
|
discord = make_discord_session()
|
|
authorization_url, state = discord.authorization_url(DISCORD_OAUTH2_URL)
|
|
|
|
session['oauth_state'] = state
|
|
return redirect(authorization_url)
|
|
|
|
@app.route("/callback")
|
|
def callback():
|
|
"""Verarbeitet den OAuth2-Rückruf von Discord und fügt nur die relevanten Gilden in die Datenbank ein."""
|
|
try:
|
|
discord = make_discord_session(state=session.get("oauth_state"))
|
|
|
|
# Abrufe des OAuth2-Tokens
|
|
token = discord.fetch_token(
|
|
DISCORD_TOKEN_URL,
|
|
client_secret=DISCORD_CLIENT_SECRET,
|
|
authorization_response=request.url,
|
|
)
|
|
|
|
session['oauth_token'] = token
|
|
|
|
# Abrufen der Benutzerinformationen von Discord
|
|
user_info = discord.get(DISCORD_API_URL).json()
|
|
session['discord_user'] = user_info
|
|
|
|
# Abrufen der Gilden des Benutzers
|
|
guilds_response = discord.get('https://discord.com/api/users/@me/guilds')
|
|
print(f"Guilds response status: {guilds_response.status_code}")
|
|
|
|
if guilds_response.status_code != 200:
|
|
print(f"Fehler beim Abrufen der Gilden: {guilds_response.content}")
|
|
flash("Fehler beim Abrufen der Gilden.", "danger")
|
|
return redirect(url_for("landing_page"))
|
|
|
|
guilds = guilds_response.json()
|
|
print(f"Guilds: {guilds}")
|
|
|
|
if not guilds:
|
|
flash("Du bist in keinem Server, der den Bot verwendet.", "danger")
|
|
return redirect(url_for("landing_page"))
|
|
|
|
# Verbindung zur Datenbank herstellen
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor()
|
|
|
|
# Hole die Gilden aus der `user_data` Tabelle, wo der Bot auf dem Server ist
|
|
cursor.execute("SELECT DISTINCT guild_id FROM user_data")
|
|
bot_guilds = cursor.fetchall()
|
|
|
|
bot_guild_ids = {str(row['guild_id']) for row in bot_guilds} # Set mit Gilden-IDs, wo der Bot ist
|
|
|
|
# Speichern oder Aktualisieren der Gildeninformationen in der Datenbank, wenn der Bot auch auf dem Server ist
|
|
for guild in guilds:
|
|
guild_id = guild['id']
|
|
|
|
if guild_id in bot_guild_ids:
|
|
guild_name = guild['name']
|
|
guild_icon = guild.get('icon', None) # Manche Gilden haben kein Icon
|
|
owner_id = guild['owner_id']
|
|
|
|
# Prüfen, ob die Gilde bereits existiert
|
|
cursor.execute("SELECT COUNT(*) FROM guilds WHERE guild_id = %s", (guild_id,))
|
|
exists = cursor.fetchone()[0]
|
|
|
|
if exists:
|
|
# Gilde aktualisieren
|
|
cursor.execute("""
|
|
UPDATE guilds
|
|
SET name = %s, icon = %s, owner_id = %s
|
|
WHERE guild_id = %s
|
|
""", (guild_name, guild_icon, owner_id, guild_id))
|
|
else:
|
|
# Neue Gilde einfügen
|
|
cursor.execute("""
|
|
INSERT INTO guilds (guild_id, name, icon, owner_id)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (guild_id, guild_name, guild_icon, owner_id))
|
|
|
|
connection.commit()
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
session['discord_guilds'] = [guild for guild in guilds if guild['id'] in bot_guild_ids] # Speichere nur die Gilden, wo der Bot ist, in der Session
|
|
|
|
return redirect(url_for("server_selection"))
|
|
|
|
except Exception as e:
|
|
print(f"Error in OAuth2 callback: {e}")
|
|
flash("Ein Fehler ist beim Authentifizierungsprozess aufgetreten.", "danger")
|
|
return redirect(url_for("landing_page"))
|
|
|
|
|
|
@app.route("/server_selection")
|
|
def server_selection():
|
|
"""Zeigt dem Benutzer eine Liste aller Server an, auf denen er sich befindet und die in der Datenbank vorhanden sind."""
|
|
if "discord_user" in session:
|
|
user_info = session["discord_user"]
|
|
user_id = user_info["id"]
|
|
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
|
|
# Abfrage der Gilden, auf denen der Benutzer in der Datenbank Einträge hat
|
|
cursor.execute("""
|
|
SELECT DISTINCT user_data.guild_id, guilds.name, guilds.icon
|
|
FROM user_data
|
|
JOIN guilds ON user_data.guild_id = guilds.guild_id
|
|
WHERE user_data.user_id = %s
|
|
""", (user_id,))
|
|
guilds = cursor.fetchall()
|
|
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
return render_template("server_selection.html", guilds=guilds)
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/admin_dashboard")
|
|
def admin_dashboard():
|
|
"""Zeigt das Bot-Admin-Dashboard an (nur für globale Admins)."""
|
|
if is_bot_admin():
|
|
return render_template("admin_dashboard.html", bot_running=bot_status())
|
|
return redirect(url_for("landing_page"))
|
|
|
|
@app.route("/server_admin_dashboard/<int:guild_id>")
|
|
def server_admin_dashboard(guild_id):
|
|
"""Zeigt das Server-Admin-Dashboard an (nur für Server-Admins)."""
|
|
if is_server_admin(guild_id):
|
|
return render_template("server_admin_dashboard.html", guild_id=guild_id)
|
|
return redirect(url_for("landing_page"))
|
|
|
|
@app.route("/user_dashboard/<int:guild_id>")
|
|
def user_dashboard(guild_id):
|
|
"""Zeigt das User-Dashboard für einen spezifischen Server (guild_id) an."""
|
|
if "discord_user" in session:
|
|
user_info = session["discord_user"]
|
|
user_id = user_info["id"]
|
|
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
|
|
# Hole Benutzerinformationen für den ausgewählten Server
|
|
cursor.execute("SELECT * FROM user_data WHERE user_id = %s AND guild_id = %s", (user_id, guild_id))
|
|
user_data = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
if user_data:
|
|
return render_template("user_dashboard.html", user_info=user_info, user_data=user_data, guild_id=guild_id)
|
|
else:
|
|
return "User data not found", 404
|
|
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
"""Meldet den Benutzer ab."""
|
|
session.clear()
|
|
return redirect(url_for("landing_page"))
|
|
|
|
# Bot Management Routes
|
|
@app.route("/start_bot")
|
|
def start():
|
|
if is_bot_admin():
|
|
start_bot()
|
|
return redirect(url_for("admin_dashboard"))
|
|
return redirect(url_for("landing_page"))
|
|
|
|
@app.route("/stop_bot")
|
|
def stop():
|
|
if is_bot_admin():
|
|
stop_bot()
|
|
return redirect(url_for("admin_dashboard"))
|
|
return redirect(url_for("landing_page"))
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000, debug=True)
|