226 lines
7.7 KiB
Python
226 lines
7.7 KiB
Python
__version__ = "dev-0.4.6"
|
|
__all__ = ["Discordbot-chatai-webpanel (Discord)"]
|
|
__author__ = "SimolZimol"
|
|
|
|
from flask import Flask, render_template, redirect, url_for, request, session, jsonify, flash
|
|
from requests_oauthlib import OAuth2Session
|
|
import os
|
|
import mysql.connector
|
|
from datetime import datetime
|
|
import subprocess
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.getenv("FLASK_SECRET_KEY")
|
|
|
|
# DB configuration
|
|
DB_HOST = os.getenv("DB_HOST")
|
|
DB_PORT = os.getenv("DB_PORT")
|
|
DB_USER = os.getenv("DB_USER")
|
|
DB_PASS = os.getenv("DB_PASSWORD")
|
|
DB_NAME = os.getenv("DB_DATABASE")
|
|
|
|
# Discord OAuth2 configuration
|
|
DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID")
|
|
DISCORD_CLIENT_SECRET = os.getenv("DISCORD_CLIENT_SECRET")
|
|
DISCORD_REDIRECT_URI = os.getenv("DISCORD_REDIRECT_URI")
|
|
DISCORD_OAUTH2_URL = "https://discord.com/api/oauth2/authorize"
|
|
DISCORD_TOKEN_URL = "https://discord.com/api/oauth2/token"
|
|
DISCORD_API_URL = "https://discord.com/api/users/@me"
|
|
os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
|
|
|
|
# Global Bot Management
|
|
bot_process = None
|
|
|
|
def get_db_connection():
|
|
"""Returns a MySQL connection."""
|
|
return mysql.connector.connect(
|
|
host=DB_HOST,
|
|
port=DB_PORT,
|
|
user=DB_USER,
|
|
password=DB_PASS,
|
|
database=DB_NAME
|
|
)
|
|
|
|
def make_discord_session(token=None, state=None):
|
|
return OAuth2Session(
|
|
DISCORD_CLIENT_ID,
|
|
token=token,
|
|
state=state,
|
|
redirect_uri=DISCORD_REDIRECT_URI,
|
|
scope=["identify"]
|
|
)
|
|
|
|
def bot_status():
|
|
"""Checks if the bot is running."""
|
|
global bot_process
|
|
if bot_process is None:
|
|
return False
|
|
return bot_process.poll() is None # None means the bot is still running
|
|
|
|
def start_bot():
|
|
"""Starts the bot process."""
|
|
global bot_process
|
|
if not bot_status():
|
|
bot_process = subprocess.Popen(["python", "bot.py"])
|
|
else:
|
|
print("Bot is already running.")
|
|
|
|
def stop_bot():
|
|
"""Stops the bot process."""
|
|
global bot_process
|
|
if bot_process and bot_status():
|
|
bot_process.terminate()
|
|
bot_process.wait()
|
|
bot_process = None
|
|
else:
|
|
print("Bot is not running.")
|
|
|
|
def is_global_admin():
|
|
"""Checks if the user has global admin privileges."""
|
|
if "discord_user" in session:
|
|
user_id = session["discord_user"]["id"]
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT global_permission FROM bot_data WHERE user_id = %s", (user_id,))
|
|
user_data = cursor.fetchone()
|
|
cursor.close()
|
|
connection.close()
|
|
if user_data and user_data["global_permission"] >= 8:
|
|
return True
|
|
return False
|
|
|
|
def is_server_admin(guild_id):
|
|
"""Checks if the user is an admin of the specified server."""
|
|
if "discord_user" in session:
|
|
user_id = session["discord_user"]["id"]
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT permission FROM user_data WHERE user_id = %s AND guild_id = %s", (user_id, guild_id))
|
|
user_data = cursor.fetchone()
|
|
cursor.close()
|
|
connection.close()
|
|
if user_data and user_data["permission"] >= 8:
|
|
return True
|
|
return False
|
|
|
|
@app.route("/")
|
|
def landing_page():
|
|
"""Displays the landing page."""
|
|
return render_template("landing.html")
|
|
|
|
@app.route("/login")
|
|
def login():
|
|
"""Initiates the Discord OAuth2 login flow."""
|
|
discord = make_discord_session()
|
|
authorization_url, state = discord.authorization_url(DISCORD_OAUTH2_URL)
|
|
session['oauth_state'] = state
|
|
return redirect(authorization_url)
|
|
|
|
@app.route("/callback")
|
|
def callback():
|
|
"""Processes the Discord OAuth2 callback."""
|
|
discord = make_discord_session(state=session.get("oauth_state"))
|
|
token = discord.fetch_token(DISCORD_TOKEN_URL, client_secret=DISCORD_CLIENT_SECRET, authorization_response=request.url)
|
|
session['oauth_token'] = token
|
|
user_info = discord.get(DISCORD_API_URL).json()
|
|
session['discord_user'] = user_info
|
|
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT global_permission FROM bot_data WHERE user_id = %s", (user_info["id"],))
|
|
global_user_data = cursor.fetchone()
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
if global_user_data and global_user_data["global_permission"] >= 8:
|
|
return redirect(url_for("admin_dashboard"))
|
|
else:
|
|
return redirect(url_for("server_select"))
|
|
|
|
@app.route("/server_select")
|
|
def server_select():
|
|
"""Displays a list of servers for the user to select from."""
|
|
if "discord_user" in session:
|
|
user_id = session["discord_user"]["id"]
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT DISTINCT guild_id FROM user_data WHERE user_id = %s", (user_id,))
|
|
guilds = cursor.fetchall()
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
return render_template("server_select.html", guilds=guilds)
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/server_dashboard/<int:guild_id>")
|
|
def server_dashboard(guild_id):
|
|
"""Displays the server dashboard for the selected server."""
|
|
if "discord_user" in session:
|
|
if is_server_admin(guild_id):
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM user_data WHERE guild_id = %s", (guild_id,))
|
|
user_data = cursor.fetchall()
|
|
cursor.close()
|
|
connection.close()
|
|
return render_template("server_dashboard.html", guild_id=guild_id, user_data=user_data)
|
|
return "Access denied", 403
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/admin_dashboard")
|
|
def admin_dashboard():
|
|
"""Displays the global admin dashboard."""
|
|
if "discord_user" in session and is_global_admin():
|
|
return render_template("admin_dashboard.html", bot_running=bot_status())
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/start_bot")
|
|
def start():
|
|
"""Starts the bot (global admin only)."""
|
|
if is_global_admin():
|
|
start_bot()
|
|
return redirect(url_for("admin_dashboard"))
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/stop_bot")
|
|
def stop():
|
|
"""Stops the bot (global admin only)."""
|
|
if is_global_admin():
|
|
stop_bot()
|
|
return redirect(url_for("admin_dashboard"))
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/giveaways/<int:guild_id>")
|
|
def view_giveaways(guild_id):
|
|
"""Displays the giveaways for the specified guild."""
|
|
if is_server_admin(guild_id):
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM giveaway_data WHERE guild_id = %s", (guild_id,))
|
|
giveaways = cursor.fetchall()
|
|
cursor.close()
|
|
connection.close()
|
|
return render_template("giveaways.html", giveaways=giveaways, guild_id=guild_id)
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/user_dashboard/<int:guild_id>")
|
|
def user_dashboard(guild_id):
|
|
"""Displays the user dashboard for the selected guild."""
|
|
if "discord_user" in session:
|
|
user_id = session["discord_user"]["id"]
|
|
connection = get_db_connection()
|
|
cursor = connection.cursor(dictionary=True)
|
|
cursor.execute("SELECT * FROM user_data WHERE user_id = %s AND guild_id = %s", (user_id, guild_id))
|
|
user_data = cursor.fetchone()
|
|
cursor.close()
|
|
connection.close()
|
|
|
|
if user_data:
|
|
return render_template("user_dashboard.html", user_data=user_data)
|
|
else:
|
|
return "No data found", 404
|
|
return redirect(url_for("login"))
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5000, debug=True)
|